[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-07-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201971238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Got it, thanks for your explanation.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-07-12 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201941972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Hi @Aitozi, in case of current implementation of list state in Rocksdb you 
are right. But e.g. there was an effort to make lists scalable like maps in 
rocksdb, it could be lazy in this case. 

This TTL implementation does not make any assumptions of underlying state 
backend. The generic state user API returns `Iterable` which in general can be 
lazy so TTL wrapper tries to keep it lazy where it is possible.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-07-11 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r201895445
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Hi @azagrebin , little doubt that you say the  

> return Iterable and avoid querying backend if not needed

But when deal with the ListState the `original.get()` has already query the 
original `Iterable` from RocksDB doesn't it ? Is this way just lazy query the 
iterable element in memory?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6186


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r199109435
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = new HashMap<>();
--- End diff --

We can already initialize the new map with `map.size()`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-29 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r199082008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
--- End diff --

My idea was to keep this method lazy when possible. Currently this 
implementation does not assume that underlying state returns `List` but rather 
works as if it is `Iterable` and lazily wraps it with `IteratorWithCleanup`. 
Only when it has to update timestamps on read, it materialises `Iterable` to 
`List`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198947650
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

It should be covered with `updateExpired` in `testExactExpirationOnWrite`


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198757244
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

As I understand, the wrapped states should already provide the default 
values. My idea was to wrap the original default value [in TTL 
factory](https://github.com/apache/flink/pull/6196/commits/4dedb9a20244a2addd337617778b17fe8349#diff-13011fbe7c28b56b994783572b461aaeR174)
 with expiration timestamp `Long.MAX_VALUE`, basically never expiring. Good 
point about test cases for it, I will add them for appending states.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-28 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198751723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

As I understand, it depends on use case. If it is parallelizable, lazy 
operations over big collection like filter and map over lists, stream will give 
boost over loops but for short collections or non-parallelizable spliterators 
the overhead kills the performance. Though, it might be hard to predict the 
type of used spliterator. I agree the real benchmarking should be done to make 
sure.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-27 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198439894
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
--- End diff --

Currently this class is used to wrap any object with TTL logic, not only 
state objects: e.g. aggregating functions. It can be injected as a "TTL time 
service" but then the common member `original` will have to be duplicated in 
function wrappers.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-27 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198439148
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
+   /** Hide expired user value and behave as if it does not exist any 
more. */
+   Exact
--- End diff --

I think I will also rename enum entries.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-27 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198436490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
+   /** TTL is disabled. State does not expire. */
+   Disabled,
--- End diff --

This is more for internally named configuration, basically for later use in 
TTL wrapper factory to decide whether to wrap with TTL or not. Alternative 
would be probably to save null'ed, Optional config or negative TTL in state 
descriptor when configured w/o TTL by default. I found it more explicit, 
although, it faces user and I planned that he will rather probably just not 
configure TTL at all than use this option.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-27 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198431042
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
--- End diff --

It actually wraps with expiration timestamp, TTL is not stored with value.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-27 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r198430741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
--- End diff --

Yes, config is only sketched. It is planned for later step when TTL will be 
activated in a State Descriptor API.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197719642
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.internal.InternalListState;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+public class TtlListStateTest extends TtlStateTestBase, List, Iterable> {
+   @Override
+   TtlListState createState() {
+   return new TtlListState<>(new MockInternalListState<>(), 
ttlConfig, timeProvider, null);
+   }
+
+   @Override
+   void initTestValues() {
+   updater = v -> ttlState.addAll(v);
+   getter = () -> 
StreamSupport.stream(ttlState.get().spliterator(), 
false).collect(Collectors.toList());
+   originalGetter = () -> ttlState.original.get();
+
+   emptyValue = Collections.emptyList();
+
+   updateValue1 = Arrays.asList(5, 7, 10);
+   updateValue2 = Arrays.asList(8, 9, 11);
+
+   getValue1 = updateValue1;
+   getValue2 = updateValue2;
+   }
+
+   private static class MockInternalListState
+   extends MockInternalKvState>
+   implements InternalListState {
+
+   MockInternalListState() {
+   value = new ArrayList<>();
+   }
+
+   @Override
+   public void update(List elements) {
+   updateInternal(elements);
+   }
+
+   @Override
+   public void addAll(List elements) {
+   value.addAll(elements);
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) {
--- End diff --

Wouldn't it make sense to check that merging namespaces also works 
correctly with the TTL state and to fix validate the contract what has to 
happen to the timestamps in this case?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197719310
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+abstract class TtlStateTestBase {
--- End diff --

I think it would make sense to extend this general test a bit to consider 
multiple keys and namespaces. Ideally, a test should really test the full 
contract specification of the tested subject. What is mean is, you could 
currently pass this test even if TTL would accidentally clear all states on the 
timeout of one state, or maybe clear all the states in the same namespace. The 
mock states can easily be extended to truly scope values by key and namespace. 
Then the test can, for example, create two keys in the same namespace and two 
keys in a different namespace and check that their timeouts are isolated from 
each other and the interaction works as expected,


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-25 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197714098
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
--- End diff --

Currently this could return `List` instead of `Iterable` and you 
might get around the instance of check.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197503773
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
--- End diff --

Maybe it is better to pass a  `TtlAggregateFunctionBuilder` and then supply 
`stateClear` and `updater` before the object is created. I think then they can 
also become immutable. Similar changes in the other states.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197502165
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

I this is true, that is a valuable case to be tested for all the appending 
states that they work correctly.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197501902
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlFoldFunction.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+
+/**
+ * This class wraps folding function with TTL logic.
+ *
+ * @param  Type of the values folded into the state
+ * @param  Type of the value in the state
+ *
+ * @deprecated use {@link TtlAggregateFunction} instead
+ */
+@Deprecated
+class TtlFoldFunction
+   extends AbstractTtlDecorator>
+   implements FoldFunction> {
+   TtlFoldFunction(FoldFunction original, TtlConfig config, 
TtlTimeProvider timeProvider) {
+   super(original, config, timeProvider);
+   }
+
+   @Override
+   public TtlValue fold(TtlValue accumulator, T value) throws 
Exception {
+   return wrapWithTs(original.fold(getUnexpried(accumulator), 
value));
--- End diff --

I think that (similar to`TtlAggregationFunction`) you need to intercept 
`null` values of accumulator here and replace them by the default value.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197498126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
+   .filter(this::unexpiredAndUpdateOrCleanup)
+   .map(TtlMapState::dropTs);
+   }
+
+   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
+   UV unexpiredValue;
+   try {
+   unexpiredValue = getWithTtlCheckAndUpdate(
+   e::getValue,
+   v -> original.put(e.getKey(), v),
+   () -> original.remove(e.getKey()));
+   } catch (Exception ex) {
+   throw new FlinkRuntimeException(ex);
+   }
+   return unexpiredValue != null;
+   }
+
+   private static  Map.Entry dropTs(Map.Entry> e) {
--- End diff --

Again, to keep it more clear I would spell out `dropTs` as `unwrapTtlState` 
or something similar.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197494851
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = map.entrySet().stream()
+   .collect(Collectors.toMap(Map.Entry::getKey, e -> 
wrapWithTs(e.getValue(;
+   original.putAll(ttlMap);
+   }
+
+   @Override
+   public void remove(UK key) throws Exception {
+   original.remove(key);
+   }
+
+   @Override
+   public boolean contains(UK key) throws Exception {
+   return get(key) != null;
+   }
+
+   @Override
+   public Iterable> entries() throws Exception {
+   return entriesStream()::iterator;
+   }
+
+   private Stream> entriesStream() throws Exception {
+   Iterable>> withTs = 
original.entries();
+   withTs = withTs == null ? Collections.emptyList() : withTs;
+   return StreamSupport
+   .stream(withTs.spliterator(), false)
--- End diff --

I saw that some of the classes make heavy use of streams by getting 
spliterators from collections. While the code is concise, this creates some 
default adapter from iterator to spliterator under the hood. IIRC this can have 
significant performance impact, especially if used with hot code paths. State 
access in Flink can be considered a hot path for some cases. It is hard to 
quantify the impact just from looking at it, but when using this kind of api 
adapters in "low-level" classes, please be aware of the potential impact. We 
might want to have a look at the performance and tune if needed. Should be ok 
for now because there is no regression in existing code, but we might want to 
measure this for heap based state eventually.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197482482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttlWithoutOverflow = currentTs > 0 ? 
Math.min(Long.MAX_VALUE - currentTs, ttl) : ttl;
+   return currentTs + ttlWithoutOverflow;
+   }
+
+
V getWithTtlCheckAndUpdate(
--- End diff --

Seems also like this method almost fits better into `AbstractTtlState`, for 
example you can access `clear()` directly.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197480035
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
--- End diff --

I wonder it is better to have this as abstract base class or prefer 
composition over inheritance for `AbstractTtlState`. It has no abstract methods 
and also no methods are overriden by subclasses, it could as well just be a 
reference of `AbstractTtlState` and not abstract.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197477831
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
--- End diff --

"can" be returned. we accept it in relaxed, we don't enforce it.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197476858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
+   /** Hide expired user value and behave as if it does not exist any 
more. */
+   Exact
--- End diff --

And here: expired state is never returned to the user. I think it does not 
matter if it is just hidden, or deleted.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197476710
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
+   /** Return still available expired user value (not yet cleaned up). */
+   Relaxed,
--- End diff --

Would explain this different in the comment. It means that expired state 
can be returned if it is not yet cleaned up.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475873
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
+   /** TTL is disabled. State does not expire. */
+   Disabled,
--- End diff --

Seems like this is never needed? Why would somebody register TTL state and 
then declare it disabled?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUpdateType.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option value configures when to prolong state TTL.
+ */
+public enum TtlUpdateType {
--- End diff --

Why not better declare the enums in the `TtlConfig` class where they belong 
to?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475154
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateVisibility.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures whether to return expired user value or not.
+ */
+public enum TtlStateVisibility {
--- End diff --

Same here.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197475245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeCharacteristic.java
 ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/**
+ * This option configures time scale to use for ttl.
+ */
+public enum TtlTimeCharacteristic {
--- End diff --

Same here.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197474567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
--- End diff --

Was this forgotten or planned for a later commit?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197460522
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
--- End diff --

Better `wrapWithTtl`


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197459764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
--- End diff --

This method does two things: checking the ttl and unwrapping the value. I 
would make this as two methods to separate concerns and make it more readable.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197458219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
--- End diff --

A bit more field comments might be good, especially about the exact 
meanings of those boolean flags. For example, it is not super obvious what 
`returnExpired` means for the code.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-22 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197457017
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+   final long ttl;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   this.ttl = config.getTtl().toMilliseconds();
+   }
+
+V getUnexpried(TtlValue ttlValue) {
--- End diff --

typo in method name


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197192870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
+   Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
--- End diff --

Maybe we should pre check the `ttl` is not null, and I wonder does the 
`ttl.toMilliseconds() == 0` would make any sense?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197188305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Okay, I see this is tricky, I agree that this should be addressed in 
another PR. We need to figure out a proper way to do that.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197179238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

This operation fits more for checkpoint full scan restoration with custom 
transformation of each state entry where expiration timestamp is optionally 
prolonged for downtime. The same as cleanup of expired state during full scan. 
I think it should be another issue and PR, because how can wrappers distinguish 
between old and new state before and after restart?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197114442
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Additional, If we won't do the time align works on recovery, then what is 
the safe `TTL` value we should set for the a job? (this is the question that 
the users always ask us when they trying to use the `TTL`(we implemented it in 
a hacking way based on `TtlDB`) to control the state's size)


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197111980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

I'm really not sure whether we should leave it for now. If we leave it for 
now, then it will be a headache problem on practical production.

As a very common situation there is a job, which reading data from kafka, 
and the user set the `TTL = 2hours` because he thinks that the data's latency 
is absolute less than 2 hours, this way they can use the TTL to safely control 
the whole state size, and got a exactly result. But, if he found that the job 
need to scale up, then he need to trigger a savepoint and rescale the job from 
it. but what if there's some problems that stop he recovering the job from the 
savepoint in a very short time, let's say he will took 30min to recover the 
job, then the result become inaccuracy.

Even the user never need to trigger a savepoint for any reason, what if the 
job means some problem(maybe some problem with some machine) and loop in 
"failed-restart-failed-..", after 2 hours we fixed the problem and the job 
automatically resume, but the state has all been expired. I think this is a 
disaster for the user.

Yes, when using the `EventTime` people this problem won't help, but the 
`ProccessTime` is a very common use case(In our production, most of the job's 
TimeCharacter is `ProccessTime`).

I know Flink's TimeService also didn't do the time align works on recovery, 
but state's TTL is a bit different with Timer. When registering a timer, what 
users offer to the API is a absolute time, but when setting the TTL, what users 
offer is just a relative time, it's us that convert the relative time to a 
absolute time to implement the TTL.



---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197110068
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valueToStore));
--- End diff --

This should be `original.updateInternal(withTs(valueToStore));`, I will 
change it


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197109577
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

I think it will be optimised by JIT as an only usage, but I will do just in 
case


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197108498
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

I think this is out of TTL scope now. Merge namespace methods are not part 
of API which immediately face user. It is used only internally for windowing, 
not sure if it needs TTL ever. Anyways read part of API should eventually evict 
expired state, merged or not. For aggregating states, merge already uses 
wrapped reading methods from aggregate functions underneath which should also 
speed up cleanup if ever used with `mergeNamespaces`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197102118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

Well, not sure, it matters at this point, it is private and used just once 
with a collected `LIst`. My thinking was that it is less verbose: 
`list.stream()` vs `StreamSupport.stream(iterable.spliterator(), false)`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197080576
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we need to "update on read", then here is a bit confusion to me. 
Currently we attach TTL for every list item, so the "update on read" should 
scope to the list item, not the whole list. So, it makes me feel that an 
`iterable` for `updateTs` seems more reasonable. What do you think?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197079917
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

I think it is a bit out of score for this PR. I thought about this concern 
but semantics of processing time is similar of real clock time and it does not 
stop if job is stopped. There is also event time option with more control over 
time. I would leave it for now. This is more about user migration of 
checkpoints. We can add a comment/open question to design doc about it.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197077377
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

In general the idea is to keep `get()` method as lazy as possible, return 
`Iterable` and avoid querying backend if not needed. At the moment list state 
internally stores `List` according to API, not `Iterable` (something to think 
about in future). So `getInternal()` returns `List` - collected `Iterable`. I 
agree it looks weird but if `get()` calls `getInternal()` it forces collecting 
`List`.

If we do not need update on read, everything can still stay lazy in final 
resulting `IteratorWithCleanup` - one more potential user iteration over 
original `Iterable` from 

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user value of state with TTL
+ */
+class TtlReducingState
+   extends AbstractTtlState, InternalReducingState>>
+   implements InternalReducingState {
+   TtlReducingState(
+   InternalReducingState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public T get() throws Exception {
+   return getInternal();
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   original.add(wrapWithTs(value, Long.MAX_VALUE));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, Should we also do the TTL check for original.mergeNamespaces()? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, should we also do the `TTL` check for `original.mergeNamespaces()`? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196998472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we've called `getInterval()` in `get()`, and make the `updateTs()` to 
accept `Iterable`, then this method seems could be removed(Or at least, we 
should add a check for if the `iterable` is assignable from `List`, if true we 
could cast it to List and return immediately).


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197004891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

The `expirationTimestamp` is an absolute timestamp, should we do the 
timestamp shift for `TtlValue` when checkpoint & recovery? For example, when 
user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. 
For some reason, he triggers a savepoint, and after 11 min he recover the job 
from the savepoint, if we don't do the timestamp shift, then all the state will 
be expired.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
+   aggregateFunction.updater = originalState::updateInternal;
+   }
+
+   @Override
+   public OUT get() throws Exception {
+   return original.get();
+   }
+
+   @Override
+   public void add(IN value) throws Exception {
+   original.add(value);
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public ACC getInternal() throws Exception {
+   return getWithTtlCheckAndUpdate(original::getInternal, 
original::updateInternal);
+   }
+
+   @Override
+   public void updateInternal(ACC valueToStore) throws Exception {
+   original.updateInternal(wrapWithTs(valueToStore));
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Should we also do the TTL check for original.mergeNamespaces()? Since we 
need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
--- End diff --

This looks a bit weird, my gut feeling is that we should call 
`getInternal()` in `get()`(as we called `updateInternal()` in `update()` in 
this class), but here is reverse.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
--- End diff --

In this block, we need to iterate the `ttlValue` twice, one for `collect()` 
and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as 
the argument, then we can avoiding the `collect()` here, this way we only need 
to iterate the `ttlValue` once.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Maybe we should also check that the `ttl` is greater than 0?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196827863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

Oh sorry, my bad, I'm misunderstand...


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196826339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The `ttlValue` is changed before the lambda from return statement so it is 
not effectively immutable any more to be used in lambda, that is why 
`finalResult` is formally needed to avoid compilation error.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196820474
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196809755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   return timeProvider.currentTimestamp() + 
config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196817846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The var `finalResult` looks like redundant or I'm misunderstand.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196816259
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valueToStore));
--- End diff --

This seems to miss a `clear()`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196819320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

In general Flink allows to operate in negative range for event time, but 
the overflow in case of very big TTL should be checked. TTL makes sense only 
non-negative. I have added fix for it.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196791555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Why not checking for `ttl`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196786370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

Does it make sense to never expire the value when the 
`ttValue.getExpirationTimestamp()` return negative?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196784275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

This looks like a bit problematic, because the 
`ttlValue.getExpirationTimestamp()` might be negative. E.g when the user 
provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value 
should never be expired, but according to the current code, it will immediately 
expired.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread azagrebin
GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6186

[FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

## What is the purpose of the change

This PR introduces TTL logic wrappers for state objects.

## Brief change log

Added
  - sketch of TtlConfig
  - AbstractTtlWrapper and AbstractTtlState
  - concrete TTL wrappers for state objects

## Verifying this change

Unit tests for state objects TTL wrappers

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not applicable at this step)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-9515

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6186


commit 62faa8ee220c21fa824fec690073c27a0a994be5
Author: Andrey Zagrebin 
Date:   2018-06-04T15:28:40Z

[FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers




---