[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544054#comment-16544054 ] ASF GitHub Bot commented on FLINK-9701: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6331 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542045#comment-16542045 ] ASF GitHub Bot commented on FLINK-9701: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6313 > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542040#comment-16542040 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6313 LGTM, nice work! Besides one comment about closing the backends after tests, the PR is ready. This is no big thing so I will just fix it myself before merging now. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542039#comment-16542039 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202130806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); --- End diff -- There is a problem that the backend is only disposed here and not after each test, this leads to some native errors when I run the test. I suggest to give this context a `dispose` method and call it in a `@After` method. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541921#comment-16541921 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202103083 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- No, timers cannot use state descriptor, they cannot extend `State` > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541825#comment-16541825 ] ASF GitHub Bot commented on FLINK-9701: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202081209 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- The method uses state descriptor to produce backend specific `InternalKvState`s. Is it also applicable for timers? It could be named `createInternalKvState` and `InternalKvStateFactory`. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541744#comment-16541744 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202058484 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- Why are we adding `Internal` here? I would suggest to call the method `create(Internal?)KeyValueState`, because there will also be other state in the future (timers). > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541738#comment-16541738 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202057089 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -93,7 +94,7 @@ private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; --- End diff -- Ok, it is `PublicEvolving` so you can keep the change. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541735#comment-16541735 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202056626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -93,7 +94,7 @@ private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; --- End diff -- This should not be changed, because the class is user-facing API and someone might have used it. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541682#comment-16541682 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202042548 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- I agree, seems like there is no good solution for this. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541681#comment-16541681 ] ASF GitHub Bot commented on FLINK-9701: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202041863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- The problem here is that after calling `next()` and then `hasNext()`, `originalIterator` might have been advanced by `hasNext()` skipping expired entries and has other current element, it means that `remove` can not be called on it consistently with wrapping iterator current element. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task >
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541673#comment-16541673 ] ASF GitHub Bot commented on FLINK-9701: --- Github user azagrebin commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202040710 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -92,6 +93,10 @@ @Nullable private String queryableStateName; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable + private StateTtlConfiguration ttlConfig; --- End diff -- will roll it back > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541656#comment-16541656 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202036711 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- I think this a problematic for example in the sequence `hasNext()`, ``next()`, `hasNext()`, `remove()` which is a valid interaction. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major >
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541633#comment-16541633 ] ASF GitHub Bot commented on FLINK-9701: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202032512 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -92,6 +93,10 @@ @Nullable private String queryableStateName; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable + private StateTtlConfiguration ttlConfig; --- End diff -- I would suggest to prefer @Nonnull and a `StateTtlConfiguration` that represents `disabled ttl`. So that the getter will also not return `null` and code can drop `null` checks. > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors
[ https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540571#comment-16540571 ] ASF GitHub Bot commented on FLINK-9701: --- GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6313 [FLINK-9701] Add TTL in state descriptors ## What is the purpose of the change This PR activates TTL feature in state descriptors. ## Brief change log - add method enabling TTL with configuration in state descriptor - integrate TTL tests with heap and rocksdb backends ## Verifying this change TTL unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-9701 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6313 commit 0f6a3e297b95defeafb1e31899e22195c5985702 Author: Andrey Zagrebin Date: 2018-07-03T17:23:41Z [FLINK-9701] Add TTL in state descriptors > Activate TTL in state descriptors > - > > Key: FLINK-9701 > URL: https://issues.apache.org/jira/browse/FLINK-9701 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Andrey Zagrebin >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)