[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544054#comment-16544054
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6331


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542045#comment-16542045
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6313


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542040#comment-16542040
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6313
  
LGTM, nice work!  Besides one comment about closing the backends after 
tests, the PR is ready. This is no big thing so I will just fix it myself 
before merging now.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542039#comment-16542039
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202130806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+   private final StateBackend stateBackend;
+   private final CheckpointStorageLocation checkpointStorageLocation;
+   private final TtlTimeProvider timeProvider;
+
+   private AbstractKeyedStateBackend keyedStateBackend;
+
+   protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+   this.timeProvider = Preconditions.checkNotNull(timeProvider);
+   this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+   this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+   }
+
+   protected abstract StateBackend createStateBackend();
+
+   private CheckpointStorageLocation createCheckpointStorageLocation() {
+   try {
+   return stateBackend
+   .createCheckpointStorage(new JobID())
+   .initializeLocationForCheckpoint(2L);
+   } catch (IOException e) {
+   throw new RuntimeException("unexpected");
+   }
+   }
+
+   void createAndRestoreKeyedStateBackend() {
+   Environment env = new DummyEnvironment();
+   try {
+   if (keyedStateBackend != null) {
+   keyedStateBackend.dispose();
--- End diff --

There is a problem that the backend is only disposed here and not after 
each test, this leads to some native errors when I run the test. I suggest to 
give this context a `dispose` method and call it in a `@After` method.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541921#comment-16541921
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202103083
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

No, timers cannot use state descriptor, they cannot extend `State`


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541825#comment-16541825
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202081209
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

The method uses state descriptor to produce backend specific 
`InternalKvState`s. Is it also applicable for timers? It could be named 
`createInternalKvState` and `InternalKvStateFactory`.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541744#comment-16541744
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202058484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

Why are we adding `Internal` here? I would suggest to call the method 
`create(Internal?)KeyValueState`, because there will also be other state in the 
future (timers). 


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541738#comment-16541738
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202057089
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

Ok, it is `PublicEvolving` so you can keep the change.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541735#comment-16541735
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202056626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

This should not be changed, because the class is user-facing API and 
someone might have used it.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541682#comment-16541682
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202042548
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I agree, seems like there is no good solution for this.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by 

[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541681#comment-16541681
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202041863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

The problem here is that after calling `next()` and then `hasNext()`, 
`originalIterator` might have been advanced by `hasNext()` skipping expired 
entries and has other current element, it means that `remove` can not be called 
on it consistently with wrapping iterator current element.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  

[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541673#comment-16541673
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202040710
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -92,6 +93,10 @@
@Nullable
private String queryableStateName;
 
+   /** Name for queries against state created from this StateDescriptor. */
+   @Nullable
+   private StateTtlConfiguration ttlConfig;
--- End diff --

 will roll it back


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541656#comment-16541656
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202036711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I think this a problematic for example in the sequence `hasNext()`, 
``next()`, `hasNext()`, `remove()` which is a valid interaction.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  

[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541633#comment-16541633
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202032512
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -92,6 +93,10 @@
@Nullable
private String queryableStateName;
 
+   /** Name for queries against state created from this StateDescriptor. */
+   @Nullable
+   private StateTtlConfiguration ttlConfig;
--- End diff --

I would suggest to prefer @Nonnull and a `StateTtlConfiguration` that 
represents `disabled ttl`. So that the getter will also not return `null` and 
code can drop `null` checks.


> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9701) Activate TTL in state descriptors

2018-07-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540571#comment-16540571
 ] 

ASF GitHub Bot commented on FLINK-9701:
---

GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6313

[FLINK-9701] Add TTL in state descriptors

## What is the purpose of the change

This PR activates TTL feature in state descriptors.

## Brief change log

  - add method enabling TTL with configuration in state descriptor
  - integrate TTL tests with heap and rocksdb backends


## Verifying this change

TTL unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9701

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6313


commit 0f6a3e297b95defeafb1e31899e22195c5985702
Author: Andrey Zagrebin 
Date:   2018-07-03T17:23:41Z

[FLINK-9701] Add TTL in state descriptors




> Activate TTL in state descriptors
> -
>
> Key: FLINK-9701
> URL: https://issues.apache.org/jira/browse/FLINK-9701
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)