[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201971238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Got it, thanks for your explanation. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201941972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Hi @Aitozi, in case of current implementation of list state in Rocksdb you are right. But e.g. there was an effort to make lists scalable like maps in rocksdb, it could be lazy in this case. This TTL implementation does not make any assumptions of underlying state backend. The generic state user API returns `Iterable` which in general can be lazy so TTL wrapper tries to keep it lazy where it is possible. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r201895445 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Hi @azagrebin , little doubt that you say the > return Iterable and avoid querying backend if not needed But when deal with the ListState the `original.get()` has already query the original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the iterable element in memory? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6186 ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r199109435 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = new HashMap<>(); --- End diff -- We can already initialize the new map with `map.size()`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r199082008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { --- End diff -- My idea was to keep this method lazy when possible. Currently this implementation does not assume that underlying state returns `List` but rather works as if it is `Iterable` and lazily wraps it with `IteratorWithCleanup`. Only when it has to update timestamps on read, it materialises `Iterable` to `List`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198947650 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- It should be covered with `updateExpired` in `testExactExpirationOnWrite` ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198757244 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- As I understand, the wrapped states should already provide the default values. My idea was to wrap the original default value [in TTL factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174) with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good point about test cases for it, I will add them for appending states. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198751723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- As I understand, it depends on use case. If it is parallelizable, lazy operations over big collection like filter and map over lists, stream will give boost over loops but for short collections or non-parallelizable spliterators the overhead kills the performance. Though, it might be hard to predict the type of used spliterator. I agree the real benchmarking should be done to make sure. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198439894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { --- End diff -- Currently this class is used to wrap any object with TTL logic, not only state objects: e.g. aggregating functions. It can be injected as a "TTL time service" but then the common member `original` will have to be duplicated in function wrappers. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198439148 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, + /** Hide expired user value and behave as if it does not exist any more. */ + Exact --- End diff -- I think I will also rename enum entries. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198436490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { + /** TTL is disabled. State does not expire. */ + Disabled, --- End diff -- This is more for internally named configuration, basically for later use in TTL wrapper factory to decide whether to wrap with TTL or not. Alternative would be probably to save null'ed, Optional config or negative TTL in state descriptor when configured w/o TTL by default. I found it more explicit, although, it faces user and I planned that he will rather probably just not configure TTL at all than use this option. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198431042 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { --- End diff -- It actually wraps with expiration timestamp, TTL is not stored with value. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r198430741 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder --- End diff -- Yes, config is only sketched. It is planned for later step when TTL will be activated in a State Descriptor API. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197719642 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.internal.InternalListState; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** Test suite for {@link TtlListState}. */ +public class TtlListStateTest extends TtlStateTestBase, List, Iterable> { + @Override + TtlListState createState() { + return new TtlListState<>(new MockInternalListState<>(), ttlConfig, timeProvider, null); + } + + @Override + void initTestValues() { + updater = v -> ttlState.addAll(v); + getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList()); + originalGetter = () -> ttlState.original.get(); + + emptyValue = Collections.emptyList(); + + updateValue1 = Arrays.asList(5, 7, 10); + updateValue2 = Arrays.asList(8, 9, 11); + + getValue1 = updateValue1; + getValue2 = updateValue2; + } + + private static class MockInternalListState + extends MockInternalKvState> + implements InternalListState { + + MockInternalListState() { + value = new ArrayList<>(); + } + + @Override + public void update(List elements) { + updateInternal(elements); + } + + @Override + public void addAll(List elements) { + value.addAll(elements); + } + + @Override + public void mergeNamespaces(N target, Collection sources) { --- End diff -- Wouldn't it make sense to check that merging namespaces also works correctly with the TTL state and to fix validate the contract what has to happen to the timestamps in this case? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197719310 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +abstract class TtlStateTestBase { --- End diff -- I think it would make sense to extend this general test a bit to consider multiple keys and namespaces. Ideally, a test should really test the full contract specification of the tested subject. What is mean is, you could currently pass this test even if TTL would accidentally clear all states on the timeout of one state, or maybe clear all the states in the same namespace. The mock states can easily be extended to truly scope values by key and namespace. Then the test can, for example, create two keys in the same namespace and two keys in a different namespace and check that their timeouts are isolated from each other and the interaction works as expected, ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197714098 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { --- End diff -- Currently this could return `List` instead of `Iterable` and you might get around the instance of check. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197503773 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; --- End diff -- Maybe it is better to pass a `TtlAggregateFunctionBuilder` and then supply `stateClear` and `updater` before the object is created. I think then they can also become immutable. Similar changes in the other states. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197502165 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- I this is true, that is a valuable case to be tested for all the appending states that they work correctly. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197501902 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; + +/** + * This class wraps folding function with TTL logic. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + * + * @deprecated use {@link TtlAggregateFunction} instead + */ +@Deprecated +class TtlFoldFunction + extends AbstractTtlDecorator> + implements FoldFunction> { + TtlFoldFunction(FoldFunction original, TtlConfig config, TtlTimeProvider timeProvider) { + super(original, config, timeProvider); + } + + @Override + public TtlValue fold(TtlValue accumulator, T value) throws Exception { + return wrapWithTs(original.fold(getUnexpried(accumulator), value)); --- End diff -- I think that (similar to`TtlAggregationFunction`) you need to intercept `null` values of accumulator here and replace them by the default value. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197498126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) + .filter(this::unexpiredAndUpdateOrCleanup) + .map(TtlMapState::dropTs); + } + + private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { + UV unexpiredValue; + try { + unexpiredValue = getWithTtlCheckAndUpdate( + e::getValue, + v -> original.put(e.getKey(), v), + () -> original.remove(e.getKey())); + } catch (Exception ex) { + throw new FlinkRuntimeException(ex); + } + return unexpiredValue != null; + } + + private static Map.Entry dropTs(Map.Entry> e) { --- End diff -- Again, to keep it more clear I would spell out `dropTs` as `unwrapTtlState` or something similar. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197494851 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = map.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> wrapWithTs(e.getValue(; + original.putAll(ttlMap); + } + + @Override + public void remove(UK key) throws Exception { + original.remove(key); + } + + @Override + public boolean contains(UK key) throws Exception { + return get(key) != null; + } + + @Override + public Iterable> entries() throws Exception { + return entriesStream()::iterator; + } + + private Stream> entriesStream() throws Exception { + Iterable>> withTs = original.entries(); + withTs = withTs == null ? Collections.emptyList() : withTs; + return StreamSupport + .stream(withTs.spliterator(), false) --- End diff -- I saw that some of the classes make heavy use of streams by getting spliterators from collections. While the code is concise, this creates some default adapter from iterator to spliterator under the hood. IIRC this can have significant performance impact, especially if used with hot code paths. State access in Flink can be considered a hot path for some cases. It is hard to quantify the impact just from looking at it, but when using this kind of api adapters in "low-level" classes, please be aware of the potential impact. We might want to have a look at the performance and tune if needed. Should be ok for now because there is no regression in existing code, but we might want to measure this for heap based state eventually. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197482482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttlWithoutOverflow = currentTs > 0 ? Math.min(Long.MAX_VALUE - currentTs, ttl) : ttl; + return currentTs + ttlWithoutOverflow; + } + + V getWithTtlCheckAndUpdate( --- End diff -- Seems also like this method almost fits better into `AbstractTtlState`, for example you can access `clear()` directly. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197480035 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { --- End diff -- I wonder it is better to have this as abstract base class or prefer composition over inheritance for `AbstractTtlState`. It has no abstract methods and also no methods are overriden by subclasses, it could as well just be a reference of `AbstractTtlState` and not abstract. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197477831 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. --- End diff -- "can" be returned. we accept it in relaxed, we don't enforce it. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197476858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, + /** Hide expired user value and behave as if it does not exist any more. */ + Exact --- End diff -- And here: expired state is never returned to the user. I think it does not matter if it is just hidden, or deleted. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197476710 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { + /** Return still available expired user value (not yet cleaned up). */ + Relaxed, --- End diff -- Would explain this different in the comment. It means that expired state can be returned if it is not yet cleaned up. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475873 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { + /** TTL is disabled. State does not expire. */ + Disabled, --- End diff -- Seems like this is never needed? Why would somebody register TTL state and then declare it disabled? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option value configures when to prolong state TTL. + */ +public enum TtlUpdateType { --- End diff -- Why not better declare the enums in the `TtlConfig` class where they belong to? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475154 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures whether to return expired user value or not. + */ +public enum TtlStateVisibility { --- End diff -- Same here. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197475245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeCharacteristic.java --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +/** + * This option configures time scale to use for ttl. + */ +public enum TtlTimeCharacteristic { --- End diff -- Same here. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197474567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder --- End diff -- Was this forgotten or planned for a later commit? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197460522 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { --- End diff -- Better `wrapWithTtl` ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197459764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { --- End diff -- This method does two things: checking the ttl and unwrapping the value. I would make this as two methods to separate concerns and make it more readable. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197458219 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; --- End diff -- A bit more field comments might be good, especially about the exact meanings of those boolean flags. For example, it is not super obvious what `returnExpired` means for the code. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197457017 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + final long ttl; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + this.ttl = config.getTtl().toMilliseconds(); + } + +V getUnexpried(TtlValue ttlValue) { --- End diff -- typo in method name ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197192870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); + Preconditions.checkArgument(ttl.toMilliseconds() >= 0, --- End diff -- Maybe we should pre check the `ttl` is not null, and I wonder does the `ttl.toMilliseconds() == 0` would make any sense? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197188305 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Okay, I see this is tricky, I agree that this should be addressed in another PR. We need to figure out a proper way to do that. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197179238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- This operation fits more for checkpoint full scan restoration with custom transformation of each state entry where expiration timestamp is optionally prolonged for downtime. The same as cleanup of expired state during full scan. I think it should be another issue and PR, because how can wrappers distinguish between old and new state before and after restart? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197114442 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- Additional, If we won't do the time align works on recovery, then what is the safe `TTL` value we should set for the a job? (this is the question that the users always ask us when they trying to use the `TTL`(we implemented it in a hacking way based on `TtlDB`) to control the state's size) ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197111980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- I'm really not sure whether we should leave it for now. If we leave it for now, then it will be a headache problem on practical production. As a very common situation there is a job, which reading data from kafka, and the user set the `TTL = 2hours` because he thinks that the data's latency is absolute less than 2 hours, this way they can use the TTL to safely control the whole state size, and got a exactly result. But, if he found that the job need to scale up, then he need to trigger a savepoint and rescale the job from it. but what if there's some problems that stop he recovering the job from the savepoint in a very short time, let's say he will took 30min to recover the job, then the result become inaccuracy. Even the user never need to trigger a savepoint for any reason, what if the job means some problem(maybe some problem with some machine) and loop in "failed-restart-failed-..", after 2 hours we fixed the problem and the job automatically resume, but the state has all been expired. I think this is a disaster for the user. Yes, when using the `EventTime` people this problem won't help, but the `ProccessTime` is a very common use case(In our production, most of the job's TimeCharacter is `ProccessTime`). I know Flink's TimeService also didn't do the time align works on recovery, but state's TTL is a bit different with Timer. When registering a timer, what users offer to the API is a absolute time, but when setting the TTL, what users offer is just a relative time, it's us that convert the relative time to a absolute time to implement the TTL. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197110068 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valueToStore)); --- End diff -- This should be `original.updateInternal(withTs(valueToStore));`, I will change it ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197109577 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- I think it will be optimised by JIT as an only usage, but I will do just in case ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197108498 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- I think this is out of TTL scope now. Merge namespace methods are not part of API which immediately face user. It is used only internally for windowing, not sure if it needs TTL ever. Anyways read part of API should eventually evict expired state, merged or not. For aggregating states, merge already uses wrapped reading methods from aggregate functions underneath which should also speed up cleanup if ever used with `mergeNamespaces`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197102118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- Well, not sure, it matters at this point, it is private and used just once with a collected `LIst`. My thinking was that it is less verbose: `list.stream()` vs `StreamSupport.stream(iterable.spliterator(), false)`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197080576 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we need to "update on read", then here is a bit confusion to me. Currently we attach TTL for every list item, so the "update on read" should scope to the list item, not the whole list. So, it makes me feel that an `iterable` for `updateTs` seems more reasonable. What do you think? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197079917 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- I think it is a bit out of score for this PR. I thought about this concern but semantics of processing time is similar of real clock time and it does not stop if job is stopped. There is also event time option with more control over time. I would leave it for now. This is more about user migration of checkpoints. We can add a comment/open question to design doc about it. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197077377 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- In general the idea is to keep `get()` method as lazy as possible, return `Iterable` and avoid querying backend if not needed. At the moment list state internally stores `List` according to API, not `Iterable` (something to think about in future). So `getInternal()` returns `List` - collected `Iterable`. I agree it looks weird but if `get()` calls `getInternal()` it forces collecting `List`. If we do not need update on read, everything can still stay lazy in final resulting `IteratorWithCleanup` - one more potential user iteration over original `Iterable` from
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001110 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user value of state with TTL + */ +class TtlReducingState + extends AbstractTtlState, InternalReducingState>> + implements InternalReducingState { + TtlReducingState( + InternalReducingState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public T get() throws Exception { + return getInternal(); + } + + @Override + public void add(T value) throws Exception { + original.add(wrapWithTs(value, Long.MAX_VALUE)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Again, should we also do the `TTL` check for `original.mergeNamespaces()`? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196998472 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { --- End diff -- If we've called `getInterval()` in `get()`, and make the `updateTs()` to accept `Iterable`, then this method seems could be removed(Or at least, we should add a check for if the `iterable` is assignable from `List`, if true we could cast it to List and return immediately). ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197004891 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * This class wraps user value of state with TTL. + * + * @param Type of the user value of state with TTL + */ +class TtlValue implements Serializable { + private final T userValue; + private final long expirationTimestamp; --- End diff -- The `expirationTimestamp` is an absolute timestamp, should we do the timestamp shift for `TtlValue` when checkpoint & recovery? For example, when user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. For some reason, he triggers a savepoint, and after 11 min he recover the job from the savepoint, if we don't do the timestamp shift, then all the state will be expired. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r197001339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the value added to the state + * @param The type of the accumulator (intermediate aggregate state). + * @param Type of the value extracted from the state + * + */ +class TtlAggregatingState + extends AbstractTtlState, InternalAggregatingState, OUT>> + implements InternalAggregatingState { + + TtlAggregatingState( + InternalAggregatingState, OUT> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer valueSerializer, + TtlAggregateFunction aggregateFunction) { + super(originalState, config, timeProvider, valueSerializer); + aggregateFunction.stateClear = originalState::clear; + aggregateFunction.updater = originalState::updateInternal; + } + + @Override + public OUT get() throws Exception { + return original.get(); + } + + @Override + public void add(IN value) throws Exception { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public ACC getInternal() throws Exception { + return getWithTtlCheckAndUpdate(original::getInternal, original::updateInternal); + } + + @Override + public void updateInternal(ACC valueToStore) throws Exception { + original.updateInternal(wrapWithTs(valueToStore)); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); --- End diff -- Should we also do the TTL check for original.mergeNamespaces()? Since we need to query the state when merging namespaces. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996094 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); --- End diff -- This looks a bit weird, my gut feeling is that we should call `getInternal()` in `get()`(as we called `updateInternal()` in `update()` in this class), but here is reverse. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196996839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); --- End diff -- In this block, we need to iterate the `ttlValue` twice, one for `collect()` and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as the argument, then we can avoiding the `collect()` here, this way we only need to iterate the `ttlValue` once. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196995095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Maybe we should also check that the `ttl` is greater than 0? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196827863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- Oh sorry, my bad, I'm misunderstand... ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196826339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The `ttlValue` is changed before the lambda from return statement so it is not effectively immutable any more to be used in lambda, that is why `finalResult` is formally needed to avoid compilation error. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196820474 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + long currentTs = timeProvider.currentTimestamp(); + long ttl = config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196809755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); + } + +TtlValue wrapWithTs(V value) { + return wrapWithTs(value, newExpirationTimestamp()); + } + + static TtlValue wrapWithTs(V value, long ts) { + return value == null ? null : new TtlValue<>(value, ts); + } + +TtlValue rewrapWithNewTs(TtlValue ttlValue) { + return wrapWithTs(ttlValue.getUserValue()); + } + + private long newExpirationTimestamp() { + return timeProvider.currentTimestamp() + config.getTtl().toMilliseconds(); --- End diff -- This will be called a lot often, so does it make sense to introduce a field to remember the `config.getTtl().toMilliseconds()`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196817846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; --- End diff -- The var `finalResult` looks like redundant or I'm misunderstand. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196816259 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** + * This class wraps list state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry value of state with TTL + */ +class TtlListState extends + AbstractTtlState, List>, InternalListState>> + implements InternalListState { + TtlListState( + InternalListState> originalState, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(originalState, config, timeProvider, valueSerializer); + } + + @Override + public void update(List values) throws Exception { + updateInternal(values); + } + + @Override + public void addAll(List values) throws Exception { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + @Override + public Iterable get() throws Exception { + Iterable> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + if (updateTsOnRead) { + List> collected = collect(ttlValue); + ttlValue = collected; + updateTs(collected); + } + final Iterable> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + private void updateTs(List> ttlValue) throws Exception { + List> unexpiredWithUpdatedTs = ttlValue.stream() + .filter(v -> !expired(v)) + .map(this::rewrapWithNewTs) + .collect(Collectors.toList()); + if (!unexpiredWithUpdatedTs.isEmpty()) { + original.update(unexpiredWithUpdatedTs); + } + } + + @Override + public void add(T value) throws Exception { + Preconditions.checkNotNull(value, "You cannot add null to a ListState."); + original.add(wrapWithTs(value)); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void mergeNamespaces(N target, Collection sources) throws Exception { + original.mergeNamespaces(target, sources); + } + + @Override + public List getInternal() throws Exception { + return collect(get()); + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + @Override + public void updateInternal(List valueToStore) throws Exception { + Preconditions.checkNotNull(valueToStore, "List of values to update cannot be null."); + original.addAll(withTs(valueToStore)); --- End diff -- This seems to miss a `clear()`. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196819320 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- In general Flink allows to operate in negative range for event time, but the overflow in case of very big TTL should be checked. TTL makes sense only non-negative. I have added fix for it. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196791555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.util.Preconditions; + +/** + * Configuration of state TTL logic. + * TODO: builder + */ +public class TtlConfig { + private final TtlUpdateType ttlUpdateType; + private final TtlStateVisibility stateVisibility; + private final TtlTimeCharacteristic timeCharacteristic; + private final Time ttl; + + public TtlConfig( + TtlUpdateType ttlUpdateType, + TtlStateVisibility stateVisibility, + TtlTimeCharacteristic timeCharacteristic, + Time ttl) { + Preconditions.checkNotNull(ttlUpdateType); + Preconditions.checkNotNull(stateVisibility); + Preconditions.checkNotNull(timeCharacteristic); --- End diff -- Why not checking for `ttl`? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196786370 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- Does it make sense to never expire the value when the `ttValue.getExpirationTimestamp()` return negative? ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r196784275 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * Base class for TTL logic wrappers. + * + * @param Type of originally wrapped object + */ +abstract class AbstractTtlDecorator { + final T original; + final TtlConfig config; + final TtlTimeProvider timeProvider; + final boolean updateTsOnRead; + final boolean returnExpired; + + AbstractTtlDecorator( + T original, + TtlConfig config, + TtlTimeProvider timeProvider) { + Preconditions.checkNotNull(original); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(timeProvider); + Preconditions.checkArgument(config.getTtlUpdateType() != TtlUpdateType.Disabled, + "State does not need to be wrapped with TTL if it is configured as disabled."); + this.original = original; + this.config = config; + this.timeProvider = timeProvider; + this.updateTsOnRead = config.getTtlUpdateType() == TtlUpdateType.OnReadAndWrite; + this.returnExpired = config.getStateVisibility() == TtlStateVisibility.Relaxed; + } + +V getUnexpried(TtlValue ttlValue) { + return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); + } + +boolean expired(TtlValue ttlValue) { + return ttlValue != null && ttlValue.getExpirationTimestamp() <= timeProvider.currentTimestamp(); --- End diff -- This looks like a bit problematic, because the `ttlValue.getExpirationTimestamp()` might be negative. E.g when the user provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value should never be expired, but according to the current code, it will immediately expired. ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6186 [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers ## What is the purpose of the change This PR introduces TTL logic wrappers for state objects. ## Brief change log Added - sketch of TtlConfig - AbstractTtlWrapper and AbstractTtlState - concrete TTL wrappers for state objects ## Verifying this change Unit tests for state objects TTL wrappers ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not applicable at this step) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-9515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6186 commit 62faa8ee220c21fa824fec690073c27a0a994be5 Author: Andrey Zagrebin Date: 2018-06-04T15:28:40Z [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers ---