[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6313


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202130806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+   private final StateBackend stateBackend;
+   private final CheckpointStorageLocation checkpointStorageLocation;
+   private final TtlTimeProvider timeProvider;
+
+   private AbstractKeyedStateBackend keyedStateBackend;
+
+   protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+   this.timeProvider = Preconditions.checkNotNull(timeProvider);
+   this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+   this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+   }
+
+   protected abstract StateBackend createStateBackend();
+
+   private CheckpointStorageLocation createCheckpointStorageLocation() {
+   try {
+   return stateBackend
+   .createCheckpointStorage(new JobID())
+   .initializeLocationForCheckpoint(2L);
+   } catch (IOException e) {
+   throw new RuntimeException("unexpected");
+   }
+   }
+
+   void createAndRestoreKeyedStateBackend() {
+   Environment env = new DummyEnvironment();
+   try {
+   if (keyedStateBackend != null) {
+   keyedStateBackend.dispose();
--- End diff --

There is a problem that the backend is only disposed here and not after 
each test, this leads to some native errors when I run the test. I suggest to 
give this context a `dispose` method and call it in a `@After` method.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202103083
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

No, timers cannot use state descriptor, they cannot extend `State`


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202081209
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

The method uses state descriptor to produce backend specific 
`InternalKvState`s. Is it also applicable for timers? It could be named 
`createInternalKvState` and `InternalKvStateFactory`.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202058484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

Why are we adding `Internal` here? I would suggest to call the method 
`create(Internal?)KeyValueState`, because there will also be other state in the 
future (timers). 


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202057089
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

Ok, it is `PublicEvolving` so you can keep the change.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202056626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

This should not be changed, because the class is user-facing API and 
someone might have used it.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202042548
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I agree, seems like there is no good solution for this.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202041863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

The problem here is that after calling `next()` and then `hasNext()`, 
`originalIterator` might have been advanced by `hasNext()` skipping expired 
entries and has other current element, it means that `remove` can not be called 
on it consistently with wrapping iterator current element.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202040710
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -92,6 +93,10 @@
@Nullable
private String queryableStateName;
 
+   /** Name for queries against state created from this StateDescriptor. */
+   @Nullable
+   private StateTtlConfiguration ttlConfig;
--- End diff --

👍 will roll it back


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202036711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I think this a problematic for example in the sequence `hasNext()`, 
``next()`, `hasNext()`, `remove()` which is a valid interaction.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202032512
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -92,6 +93,10 @@
@Nullable
private String queryableStateName;
 
+   /** Name for queries against state created from this StateDescriptor. */
+   @Nullable
+   private StateTtlConfiguration ttlConfig;
--- End diff --

I would suggest to prefer @Nonnull and a `StateTtlConfiguration` that 
represents `disabled ttl`. So that the getter will also not return `null` and 
code can drop `null` checks.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-11 Thread azagrebin
GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6313

[FLINK-9701] Add TTL in state descriptors

## What is the purpose of the change

This PR activates TTL feature in state descriptors.

## Brief change log

  - add method enabling TTL with configuration in state descriptor
  - integrate TTL tests with heap and rocksdb backends


## Verifying this change

TTL unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9701

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6313


commit 0f6a3e297b95defeafb1e31899e22195c5985702
Author: Andrey Zagrebin 
Date:   2018-07-03T17:23:41Z

[FLINK-9701] Add TTL in state descriptors




---