[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202517181
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
 ---
@@ -85,10 +85,10 @@ public int compare(T o1, T o2) {
return ((Comparable) o1).compareTo(o2);
}
 
-   // we catch this case before moving to more expensive tie 
breaks.
-   if (o1.equals(o2)) {
-   return 0;
-   }
+// // we catch this case before moving to more expensive tie 
breaks.
--- End diff --

For what reason we need to comment this.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202519256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
--- End diff --

nit: the name `stateTable` is a bit confusion, since it is the 
`RegisteredState`(which might not be `StateTable`) now...


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202517247
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -305,6 +351,6 @@ private void checkRefillCacheFromStore() {
 * after usage.
 */
@Nonnull
-   CloseableIterator orderedIterator();
+   CloseableIterator orderedIterator();;
--- End diff --

a duplicated `;`


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202516863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupPartitioner.java
 ---
@@ -264,6 +265,42 @@ public void writeMappingsInKeyGroup(@Nonnull 
DataOutputView dov, int keyGroupId)
}
}
 
+   public static  StateSnapshotKeyGroupReader 
createKeyGroupPartitionReader(
+   @Nonnull ElementReaderFunction readerFunction,
+   @Nonnull KeyGroupElementsConsumer elementConsumer) {
+   return new PartitioningResultKeyGroupReader<>(readerFunction, 
elementConsumer);
+   }
+
+   /**
+* General algorithm to read key-grouped state that was written from a 
{@link PartitioningResult}
+* @param 
--- End diff --

description for `T` is missing.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202519024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSnapshotRestoreWrapper.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
+import org.apache.flink.runtime.state.StateSnapshotRestore;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * This wrapper combines a HeapPriorityQueue with backend meta data.
+ *
+ * @param  type of the queue elements.
+ */
+public class HeapPriorityQueueSnapshotRestoreWrapper
+   implements StateSnapshotRestore {
+
+   @Nonnull
+   private final HeapPriorityQueueSet priorityQueue;
+   @Nonnull
+   private final KeyExtractorFunction keyExtractorFunction;
+   @Nonnull
+   private final RegisteredPriorityQueueStateBackendMetaInfo metaInfo;
+   @Nonnull
+   private final KeyGroupRange localKeyGroupRange;
+   @Nonnegative
+   private final int totalKeyGroups;
+
+   public HeapPriorityQueueSnapshotRestoreWrapper(
+   @Nonnull HeapPriorityQueueSet priorityQueue,
+   @Nonnull RegisteredPriorityQueueStateBackendMetaInfo 
metaInfo,
+   @Nonnull KeyExtractorFunction keyExtractorFunction,
+   @Nonnull KeyGroupRange localKeyGroupRange,
+   int totalKeyGroups) {
+
+   this.priorityQueue = priorityQueue;
+   this.keyExtractorFunction = keyExtractorFunction;
+   this.metaInfo = metaInfo;
+   this.localKeyGroupRange = localKeyGroupRange;
+   this.totalKeyGroups = totalKeyGroups;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Nonnull
+   @Override
+   public StateSnapshot stateSnapshot() {
+   final T[] queueDump = (T[]) priorityQueue.toArray(new 
HeapPriorityQueueElement[priorityQueue.size()]);
+
+   final TypeSerializer elementSerializer = 
metaInfo.getElementSerializer();
+
+   // turn the flat copy into a deep copy if required.
+   if (!elementSerializer.isImmutableType()) {
+   for (int i = 0; i < queueDump.length; ++i) {
+   queueDump[i] = 
elementSerializer.copy(queueDump[i]);
+   }
+   }
+
+   return new HeapPriorityQueueStateSnapshot<>(
+   queueDump,
+   keyExtractorFunction,
+   metaInfo,
--- End diff --

We only dump the queued elements here, should we also need to take a 
snapshot of the metaInfo in case something of it are might not immutable?


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202518473
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
@@ -63,54 +72,46 @@ public 
RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe
}
 
@SuppressWarnings("unchecked")
-   public RegisteredBroadcastBackendStateMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
+   public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
OperatorStateHandle.Mode.valueOf(

snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-   (TypeSerializer) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
+   (TypeSerializer) Preconditions.checkNotNull(
+   
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));

Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
}
 
/**
 * Creates a deep copy of the itself.
 */
-   public RegisteredBroadcastBackendStateMetaInfo deepCopy() {
-   return new RegisteredBroadcastBackendStateMetaInfo<>(this);
+   @Nonnull
+   public RegisteredBroadcastStateBackendMetaInfo deepCopy() {
+   return new RegisteredBroadcastStateBackendMetaInfo<>(this);
}
 
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
-   Map optionsMap = Collections.singletonMap(
-   
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
-   assignmentMode.toString());
-   Map> serializerMap = new HashMap<>(2);
-   Map 
serializerConfigSnapshotsMap = new HashMap<>(2);
-   String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
-   String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
-   serializerMap.put(keySerializerKey, keySerializer.duplicate());
-   serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
-   serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
-   serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
-
-   return new StateMetaInfoSnapshot(
-   name,
-   StateMetaInfoSnapshot.BackendStateType.BROADCAST,
-   optionsMap,
-   serializerConfigSnapshotsMap,
-   serializerMap);
+   if (precomputedSnapshot == null) {
+   precomputedSnapshot = precomputeSnapshot();
+   }
+   return precomputedSnapshot;
--- End diff --

What if the serializers are not all immutable? Should we need a `immutable` 
field for it? Only when it is true we return the `precomputeSnapshot`.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202519294
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparable.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ *
+ * @param 
--- End diff --

Description for `T` is missing


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-07-13 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen Thanks! Looking forward~


---


[GitHub] flink issue #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() does not ...

2018-07-12 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6306
  
@aljoscha Thanks for your quick review, will address your comments while 
merging.


---


[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

2018-07-11 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6308#discussion_r201778299
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 ---
@@ -98,8 +103,7 @@ public int getVersion() {
 
@Override
public int[] getCompatibleVersions() {
-   // we are compatible with version 3 (Flink 1.3.x) and version 1 
& 2 (Flink 1.2.x)
-   return new int[] {VERSION, 3, 2, 1};
+   return new int[]{VERSION, 4, 3, 2, 1};
--- End diff --

nit: miss a ' ' between '[]' and '{'.


---


[GitHub] flink issue #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not consider id...

2018-07-11 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6301
  
Hi @jrthe42, In general the checkpoint include two part of works.

- part1: take a snapshot of the state.
- part2: transfer the snapshot to the checkpoint destination(e.g. DFS)

The part1 need to be sync, and the part2 can be async, if I'm not wrong. 




---


[GitHub] flink issue #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() does not ...

2018-07-11 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6306
  
CC @aljoscha 


---


[GitHub] flink pull request #6306: [FLINK-9804][state] KeyedStateBackend.getKeys() do...

2018-07-11 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6306

[FLINK-9804][state] KeyedStateBackend.getKeys() does not work on RocksDB 
MapState

## What is the purpose of the change

*This PR fixes the bug that the KeyedStateBackend.getKeys() does not work 
on RocksDB MapState.*


## Brief change log

  - *Change `RocksDBKeyedStateBackend#RocksIteratorForKeysWrapper()` to let 
it support get keys for RocksDB MapState.*


## Verifying this change

  - *Added `StateBackendTestBase#testMapStateGetKeys()` to guard the 
changes*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

- No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9804

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6306


commit efc4096a4a6f38b3acb0b5189804f7b452218f23
Author: sihuazhou 
Date:   2018-07-11T12:35:22Z

fix KeyedStateBackend.getKeys() for RocksDBMapState.




---


[GitHub] flink pull request #6301: [FLINK-9794] [jdbc] JDBCOutputFormat does not cons...

2018-07-11 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6301#discussion_r201676322
  
--- Diff: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 ---
@@ -111,110 +134,117 @@ public void writeRecord(Row row) throws IOException 
{
if (typesArray != null && typesArray.length > 0 && 
typesArray.length != row.getArity()) {
LOG.warn("Column SQL types array doesn't match arity of 
passed Row! Check the passed array...");
}
-   try {
+   synchronized (this) {
--- End diff --

Why do we need synchronized this?


---


[GitHub] flink pull request #6133: [FLINK-9351][Distributed Coordination] RM stop ass...

2018-07-02 Thread sihuazhou
Github user sihuazhou closed the pull request at:

https://github.com/apache/flink/pull/6133


---


[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

2018-07-02 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6228
  
+1 from my side


---


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-02 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
Hi @tillrohrmann I added a unit test 
`FsCheckpointStorageTest#testResolveCheckpointStorageLocation()` to guard the 
changes. But I still use one `mock` for creating two different file systems(one 
for checkpoint, another for savepoint), because without mock I can only create 
a LocalFileSystem in the test. Could you please have a look again?


---


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-02 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@tillrohrmann Thanks for your review and suggestion, will change it tonight~


---


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-07-01 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
@yanghua Thanks for the review, I rebased the PR.


---


[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6228
  
@StefanRRichter very nice PR, I only had some very minor comments.


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199325117
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t 
their priority.
+ *
+ * @param  type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue {
+
+   /**
+* Retrieves and removes the first element (w.r.t. the order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element of this ordered set, or {@code null} if 
this set is empty.
+*/
+   @Nullable
+   T poll();
+
+   /**
+* Retrieves, but does not remove, the element (w.r.t. order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element (w.r.t. order) of this ordered set, or 
{@code null} if this set is empty.
+*/
+   @Nullable
+   T peek();
+
+   /**
+* Adds the given element to the set, if it is not already contained.
+*
+* @param toAdd the element to add to the set.
+* @return true if the operation changed the head element or 
if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed by 
this operation.
+*/
+   boolean add(@Nonnull T toAdd);
+
+   /**
+* Removes the given element from the set, if is contained in the set.
+*
+* @param toRemove the element to remove.
+* @return true if the operation changed the head element or 
if is it unclear if the head element changed.
--- End diff --

nit: unclosed HTML tag ``


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199325112
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t 
their priority.
+ *
+ * @param  type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue {
+
+   /**
+* Retrieves and removes the first element (w.r.t. the order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element of this ordered set, or {@code null} if 
this set is empty.
+*/
+   @Nullable
+   T poll();
+
+   /**
+* Retrieves, but does not remove, the element (w.r.t. order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element (w.r.t. order) of this ordered set, or 
{@code null} if this set is empty.
+*/
+   @Nullable
+   T peek();
+
+   /**
+* Adds the given element to the set, if it is not already contained.
+*
+* @param toAdd the element to add to the set.
+* @return true if the operation changed the head element or 
if is it unclear if the head element changed.
--- End diff --

nit: unclosed HTML tag ``


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199325523
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199325172
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally 
partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics 
for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param  the type of elements in the queue.
+ * @param  type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement>
+   implements InternalPriorityQueue {
+
+   /** A heap of heap sets. Each sub-heap represents the partition for a 
key-group.*/
+   @Nonnull
+   private final HeapPriorityQueue keyGroupHeap;
+
+   /** All elements from keyGroupHeap, indexed by their key-group id, 
relative to firstKeyGroup. */
+   @Nonnull
+   private final PQ[] keyGroupLists;
+
+   /** Function to extract the key from contained elements. */
+   @Nonnull
+   private final KeyExtractorFunction keyExtractor;
+
+   /** The total number of key-groups (in the job). */
+   @Nonnegative
+   private final int totalKeyGroups;
+
+   /** The smallest key-group id with a subpartition managed by this 
ordered set. */
+   @Nonnegative
+   private final int firstKeyGroup;
+
+   @SuppressWarnings("unchecked")
+   public KeyGroupPartitionedPriorityQueue(
+   @Nonnull KeyExtractorFunction keyExtractor,
+   @Nonnull Comparator elementComparator,
+   @Nonnull PartitionQueueSetFactory orderedCacheFactory,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalKeyGroups) {
+
+   this.keyExtractor = keyExtractor;
+   this.totalKeyGroups = totalKeyGroups;
+   this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+   this.keyGroupLists = (PQ[]) new 
InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+   this.keyGroupHeap = new HeapPriorityQueue<>(
+   new 
InternalPriorityQueueComparator<>(elementComparator),
+   keyGroupRange.getNumberOfKeyGroups());
+   for (int i = 0; i < keyGroupLists.length; i++) {
+   final PQ keyGroupCache =
+   orderedCacheFactory.create(firstKeyGroup + i, 
elementComparator);
+   keyGroupLists[i] = keyGroupCache;
+   keyGroupHeap.add(keyGroupCache);
+   }
+   }
+
+   @Nullable
+   @Override
+   public T poll() {
+   final PQ headList = keyGroupHeap.peek();
+   final T head = headList.poll();
+   keyGroupHeap.adjustModifiedElement(headList);
+   return head;
+   }
+
+   @Nullable
+   @Override
+   public T peek() {
+   return keyGroupHeap.peek().peek();
+   }
+
+   @Override
+   public boolean add(@Nonnull T toAdd) {
+   final PQ list = getListForElementKeyGroup(toAdd);
+
+   // the branch checks if the head element has 

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199325190
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
--- End diff --

Does `RocksDBOrderedStore` mean `RocksDBOrderedSetStore`?


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199320321
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199320135
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with 
set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) 
ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) 
ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the 
underlying store and contains contains an
--- End diff --

duplicated `contains`


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199320254
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199320146
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * This class is an implementation of a {@link InternalPriorityQueue} with 
set semantics that internally consists of
+ * two different storage types. The first storage is a (potentially slow) 
ordered set store manages the ground truth
+ * about the elements in this queue. The second storage is a (fast) 
ordered set cache, typically with some limited
+ * capacity. The cache is used to improve performance of accesses to the 
underlying store and contains contains an
+ * ordered (partial) view on the top elements in the ordered store. We are 
currently applying simple write-trough
--- End diff --

typo: `write-trough` -> `write-through`


---


[GitHub] flink issue #6125: [FLINK-9532] Flink Overview of Jobs Documentation Incorre...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6125
  
@yanghua sorry I'm not a native english speaker, so I think I'm not 
suitable to review this...


---


[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6132
  
Hi @tillrohrmann I updated the PR could you please have a look again?


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r199315500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
 
+   @Override
+   public void taskManagerTerminated(ResourceID resourceID, 
Set allocationIds, Exception cause) {
--- End diff --

My previous thought was that `RM` needed to notify the `allocationIds` that 
was assigned to `JM`, because it was possible that `SlotManager` had already 
assigned slots to `JM`, but `TM` was killed before `JM` established a 
connection. Mainly to address the issue in 
https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach 
you suggested I think the problem in Flink-9351 has been fixed by the way.


---


[GitHub] flink issue #6132: [FLINK-9456][Distributed Coordination]Let ResourceManager...

2018-06-28 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6132
  
@tillrohrmann Thanks for your review and good suggestions, changing the 
code according to it.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146934
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

In fact, what I means is that if you want to throw an exception here, you 
could throw the exception as `throw new RuntimeException("exception message", 
originalException)`, this way the original exception won't be swallowed.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198146056
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

Yes.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198135717
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
--- End diff --

I would suggest to use the `entries.putAll()` instead, since it could get a 
better performance when your are using RocksDB backend.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198136425
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
+   try {
+   eventsBuffer.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

Same here, do not swallow the original Exception.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198135796
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
+   }
+   }
+   );
+
+   eventsBufferCache.forEach((k, v) -> {
--- End diff --

Same here.


---


[GitHub] flink pull request #6205: [FLINK-9642]Reduce the count to deal with state du...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6205#discussion_r198136431
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
 ---
@@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws 
Exception {
 * @throws Exception Thrown if the system cannot access the state.
 */
public void releaseEvent(EventId eventId) throws Exception {
-   Lockable eventWrapper = eventsBuffer.get(eventId);
+   Lockable eventWrapper = getEvent(eventId);
if (eventWrapper != null) {
if (eventWrapper.release()) {
eventsBuffer.remove(eventId);
+   eventsBufferCache.remove(eventId);
} else {
-   eventsBuffer.put(eventId, eventWrapper);
+   cacheEvent(eventId, eventWrapper);
}
}
}
 
+   // Cache related method
+
+   /
+   //  Put
+   /
+
+   /**
+* Put an event to cache.
+* @param eventId id of the event
+* @param event event body
+*/
+   private void cacheEvent(EventId eventId, Lockable event) {
+   this.eventsBufferCache.put(eventId, event);
+   }
+
+   /**
+* Put a ShareBufferNode to cache.
+* @param nodeId id of the event
+* @param entry SharedBufferNode
+*/
+   private void cacheEntry(NodeId nodeId, Lockable 
entry) {
+   this.entryCache.put(nodeId, entry);
+   }
+
+   /
+   // Get
+   /
+
+   /**
+* Try to get the sharedBufferNode from state iff the node has not been 
quered during this turn process.
+* @param nodeId id of the event
+* @return SharedBufferNode
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   private Lockable getEntry(NodeId nodeId) throws 
Exception {
+   Lockable entry = entryCache.get(nodeId);
+   return  entry != null ? entry : entries.get(nodeId);
+   }
+
+   private Lockable getEvent(EventId eventId) throws Exception {
+   Lockable event = eventsBufferCache.get(eventId);
+   return event != null ? event : eventsBuffer.get(eventId);
+   }
+
+   /**
+* Flush the event and node in map to state.
+* @throws Exception Thrown if the system cannot access the state.
+*/
+   public void flushCache() throws Exception {
+   entryCache.forEach((k, v) -> {
+   try {
+   entries.put(k, v);
+   } catch (Exception e) {
+   throw new RuntimeException();
--- End diff --

I would suggest to not swallow the original Exception here.


---


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198129785
  
--- Diff: flink-tests/src/test/resources/log4j-test.properties ---
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
--- End diff --

This looks like should be reverted.


---


[GitHub] flink pull request #6208: [FLINK-9634] Disable local recovery scheduling if ...

2018-06-26 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6208#discussion_r198129120
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSlotPoolFactory.java
 ---
@@ -75,10 +82,21 @@ public static DefaultSlotPoolFactory fromConfiguration(
final Time rpcTimeout = 
AkkaUtils.getTimeoutAsTime(configuration);
final Time slotIdleTimeout = 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
 
+   final SchedulingStrategy schedulingStrategy = 
selectSchedulingStrategy(configuration);
+
return new DefaultSlotPoolFactory(
rpcService,
+   schedulingStrategy,
SystemClock.getInstance(),
rpcTimeout,
slotIdleTimeout);
}
+
+   private static SchedulingStrategy 
selectSchedulingStrategy(Configuration configuration) {
+   if 
(configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)) {
+   return 
PreviousAllocationSchedulingStrategy.getInstance();
+   } else {
+   return 
LocationPreferenceSchedulingStrategy.getInstance();
--- End diff --

Does it make sense to use the `LocationPreferenceSchedulingStrategy` for 
the init scheduling even when the "local recovery" is enable?


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015825
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deseriali

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015848
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deseriali

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015385
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
--- End diff --

Same here might result in a bad performance because of the random access of 
the list.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015282
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
--- End diff --

I would suggest to use the `Iterator` instead of `xxx.get(i)` here, because 
if the `originalSerializers` (or the other instances) is type of LinkedList the 
performance could be bad. 


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198015105
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
--- End diff --

nit: if we don't use the high level API, we could do this in a single loop 
with a better performance.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198014886
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
--- End diff --

If `originalSerializers` or the `originalValues` is a type of something 
like `LinkedList`, then the `originalSerializers.get(i)` and 
`originalValues.get(i)` will get a very poor performance. I think we might 
should use the `iterator` here.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r198014021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

I see, but this looks a bit weird, it looks like that the `stateFactories` 
is only used for looking up the corresponding state factory for the 
`stateDesc`, and we need to firstly create it every time when calling the 
`createStateAndWrapWithTtlIfEnabled()`, the flow of the 
`createStateAndWrapWithTtlIfEnabled()` looks like:

- create a map of `state factory` (stateFactories).
- use the stateFactories to look up the corresponding state factory for the 
`stateDesc`.
- ...

In that case, maybe use the `switch case` to find the corresponding state 
factory is better, at lest we don't need to firstly create a map of `state 
factory` for every call  this way. What do you think?


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-25 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
@StefanRRichter Thanks for you reply! I think that makes sense, there is 
still a workaround for the user to go. `+1` to implement the current approach 
firstly.


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-24 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
Maybe let me elaborate the TTL checking condition in detail, overall the 
checking condition contains two parts and looks like `(current_ts - update_ts) 
- time_shift_offset >= TTL`.

The `time_shift_offset` is the shift offset that we should applied when 
checking the TTL.

- For the records that the `update_ts` > `checkpoint_ts`, we could know 
they were created(or updated) after the last restoring so we don't need to 
apply any shift to it. So that shift offset is `0`.

- For the records that the `update_ts` <= `checkpoint_ts`, we could know 
they were created(or updated) before the last restoring so we need to apply the 
shift to it, the shift offset is `recovery_ts - checkpoint_ts`.

In our current code, we didn't do the time-align works, it equals to a 
special case of the above condition where the `time_shift_offset` is always `0`.


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-24 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
I'm still a bit worried about the time-align problem on recovery(because 
I've met serval case that would become disaster on production if we don't do 
the time-align on recovery. (One instance: we used the DFS to store the 
checkpoint data, and the DFS went into safe-mode because of some problems, we 
took several hours to notice that and also took some times to address the 
issue. After addressing DFS's issue, user's jobs were resumed and begin to run 
correctly. In this case, if we don't do the time-align on recovery, then user's 
state maybe already totally expired(when TTL <= the `system down time`)).

I had a second thought on this problem, and I think maybe we could do that 
without a full scanning of the records, the approach is outlined below.

- 1. We need to remember the timestamp when performing the checkpoint, 
let's say it `checkpoint_ts`.
- 2. We also need to remember the timestamp when recovering from the 
checkpoint, let's say it `recovery_ts`.
- 3. For each record, we remember it's last update timestamp, let's say it 
`update_ts`.
- 5. And the current time stamp is `current_ts`.
- 4. Then we could use the follow condition `checkpoint_ts - update_ts + 
current_s - recovery_ts >= TTL` to check whether the record is expired. If it's 
true then record is expired, otherwise the record is still alive.

What do you think? @azagrebin ,  and @StefanRRichter would be really nice 
to learn your opinion about this problem.


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-22 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6186
  
@azagrebin thanks for addressing the concerns, looks good from my side now. 
let's wait for @StefanRRichter 's review.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329976
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
--- End diff --

I would suggest to give a init size for `originalValues`, e.g. `List 
originalValues = new ArrayList(originalSerializers.size());`.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331145
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
--- End diff --

Would be better to check the args are not null, or simply use the 
`@Nonnull` annotation.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330095
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deseriali

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330813
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -48,7 +48,8 @@
KeyedStateBackend,
Snapshotable, 
Collection>,
Closeable,
-   CheckpointListener {
+   CheckpointListener,
+   KeyedStateFactory{
--- End diff --

I think this seems to miss a space ` `.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329533
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
--- End diff --

I think this check looks like a bug.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

Why this couldn't be static?


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330596
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deseriali

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197192870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
+   Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
--- End diff --

Maybe we should pre check the `ttl` is not null, and I wonder does the 
`ttl.toMilliseconds() == 0` would make any sense?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197188305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Okay, I see this is tricky, I agree that this should be addressed in 
another PR. We need to figure out a proper way to do that.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197114442
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Additional, If we won't do the time align works on recovery, then what is 
the safe `TTL` value we should set for the a job? (this is the question that 
the users always ask us when they trying to use the `TTL`(we implemented it in 
a hacking way based on `TtlDB`) to control the state's size)


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197111980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

I'm really not sure whether we should leave it for now. If we leave it for 
now, then it will be a headache problem on practical production.

As a very common situation there is a job, which reading data from kafka, 
and the user set the `TTL = 2hours` because he thinks that the data's latency 
is absolute less than 2 hours, this way they can use the TTL to safely control 
the whole state size, and got a exactly result. But, if he found that the job 
need to scale up, then he need to trigger a savepoint and rescale the job from 
it. but what if there's some problems that stop he recovering the job from the 
savepoint in a very short time, let's say he will took 30min to recover the 
job, then the result become inaccuracy.

Even the user never need to trigger a savepoint for any reason, what if the 
job means some problem(maybe some problem with some machine) and loop in 
"failed-restart-failed-..", after 2 hours we fixed the problem and the job 
automatically resume, but the state has all been expired. I think this is a 
disaster for the user.

Yes, when using the `EventTime` people this problem won't help, but the 
`ProccessTime` is a very common use case(In our production, most of the job's 
TimeCharacter is `ProccessTime`).

I know Flink's TimeService also didn't do the time align works on recovery, 
but state's TTL is a bit different with Timer. When registering a timer, what 
users offer to the API is a absolute time, but when setting the TTL, what users 
offer is just a relative time, it's us that convert the relative time to a 
absolute time to implement the TTL.



---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197080576
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we need to "update on read", then here is a bit confusion to me. 
Currently we attach TTL for every list item, so the "update on read" should 
scope to the list item, not the whole list. So, it makes me feel that an 
`iterable` for `updateTs` seems more reasonable. What do you think?


---


[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6194
  
CC @zentol 


---


[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

2018-06-21 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6194

[FLINK-9633][checkpoint] Use savepoint path's file system to create 
checkpoint output stream

## What is the purpose of the change

*This PR fixes Flink doesn't use the savepoint path's filesystem to create 
the output stream on TM side.*

## Brief change log

  - *Use Savepoint path's file system to create checkpoint output stream.*


## Verifying this change

  - *Added 
`StreamTaskTest#testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint()`
 to verify the changes*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - No


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9633

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6194


commit f006416f652485bd59124a57774fdeaafa81824b
Author: sihuazhou 
Date:   2018-06-21T09:42:54Z

Use Savepoint path's file system to create checkpoint output stream.




---


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197008798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

I would suggest to use try-with-resource to make sure to close `in`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user value of state with TTL
+ */
+class TtlReducingState
+   extends AbstractTtlState, InternalReducingState>>
+   implements InternalReducingState {
+   TtlReducingState(
+   InternalReducingState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public T get() throws Exception {
+   return getInternal();
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   original.add(wrapWithTs(value, Long.MAX_VALUE));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, Should we also do the TTL check for original.mergeNamespaces()? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, should we also do the `TTL` check for `original.mergeNamespaces()`? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196998472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we've called `getInterval()` in `get()`, and make the `updateTs()` to 
accept `Iterable`, then this method seems could be removed(Or at least, we 
should add a check for if the `iterable` is assignable from `List`, if true we 
could cast it to List and return immediately).


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197004891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

The `expirationTimestamp` is an absolute timestamp, should we do the 
timestamp shift for `TtlValue` when checkpoint & recovery? For example, when 
user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. 
For some reason, he triggers a savepoint, and after 11 min he recover the job 
from the savepoint, if we don't do the timestamp shift, then all the state will 
be expired.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
+   aggregateFunction.updater = originalState::updateInternal;
+   }
+
+   @Override
+   public OUT get() throws Exception {
+   return original.get();
+   }
+
+   @Override
+   public void add(IN value) throws Exception {
+   original.add(value);
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public ACC getInternal() throws Exception {
+   return getWithTtlCheckAndUpdate(original::getInternal, 
original::updateInternal);
+   }
+
+   @Override
+   public void updateInternal(ACC valueToStore) throws Exception {
+   original.updateInternal(wrapWithTs(valueToStore));
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Should we also do the TTL check for original.mergeNamespaces()? Since we 
need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
--- End diff --

This looks a bit weird, my gut feeling is that we should call 
`getInternal()` in `get()`(as we called `updateInternal()` in `update()` in 
this class), but here is reverse.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
--- End diff --

In this block, we need to iterate the `ttlValue` twice, one for `collect()` 
and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as 
the argument, then we can avoiding the `collect()` here, this way we only need 
to iterate the `ttlValue` once.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Maybe we should also check that the `ttl` is greater than 0?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196827863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

Oh sorry, my bad, I'm misunderstand...


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196820474
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196809755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   return timeProvider.currentTimestamp() + 
config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196817846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The var `finalResult` looks like redundant or I'm misunderstand.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196816259
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valueToStore));
--- End diff --

This seems to miss a `clear()`.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196791555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Why not checking for `ttl`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196786370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

Does it make sense to never expire the value when the 
`ttValue.getExpirationTimestamp()` return negative?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196784275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

This looks like a bit problematic, because the 
`ttlValue.getExpirationTimestamp()` might be negative. E.g when the user 
provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value 
should never be expired, but according to the current code, it will immediately 
expired.


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Could anybody have a look at this?


---


[GitHub] flink issue #6185: [FLINK-9619][YARN] Eagerly close the connection with task...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6185
  
CC @tillrohrmann 


---


[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...

2018-06-19 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6185

[FLINK-9619][YARN] Eagerly close the connection with task manager when the 
container is completed

## What is the purpose of the change

*We should always eagerly close the connection with task manager when the 
container is completed.*


## Brief change log

  - *Eagerly close the connection with task manager when the container is 
completed*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - No


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9619

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6185


commit a667ea120b0c2519ce45e5919c1377e897897d17
Author: sihuazhou 
Date:   2018-06-20T04:41:05Z

Eagerly close the connection with task manager when the container is 
completed.




---


[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6151#discussion_r196366033
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -105,41 +108,54 @@
/** The currently accessing thread, set and checked on debug level 
only. */
private transient volatile Thread currentThread;
 
-   // 

+   // --- instantiation methods 
--
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
-* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
-* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
+*
+* This constructor is expected to be used only with {@link 
GenericRecord}.
+* For {@link SpecificRecord} or reflection serializer use {@link 
AvroSerializer#forNonGeneric(Class)}.
+*
+* @param schema the explicit schema to use for generic records.
 */
-   public AvroSerializer(Class type) {
-   checkArgument(!isGenericRecord(type),
-   "For GenericData.Record use constructor with explicit 
schema.");
-   this.type = checkNotNull(type);
-   this.schemaString = null;
+   public static AvroSerializer forGeneric(Schema schema) {
+   return new AvroSerializer<>(GenericRecord.class, schema);
--- End diff --

Should we do checking for schema to make sure it not null here?


---


[GitHub] flink issue #6088: [FLINK-9417][ Distributed Coordination] Send heartbeat re...

2018-06-18 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6088
  
cc @tillrohrmann 


---


[GitHub] flink issue #6174: [FLINK-9601][state]Try to reuse the snapshotData array as...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6174
  
@StefanRRichter Thanks for the review, I think that makes a lot of sense, 
will change it according to your suggestion.


---


[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903383
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -110,7 +123,7 @@
/**
 * Map of state names to their corresponding restored state meta info.
 *
-* 
+* 
--- End diff --

I think the  might should be reverted.


---


[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903191
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -244,7 +271,14 @@ public ExecutionConfig 
setExecutionConfig(ExecutionConfig config) {
return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace).thenApply(
stateResponse -> {
try {
-   return stateDescriptor.bind(new 
ImmutableStateBinder(stateResponse.getContent()));
+   if 
(!STATE_FACTORIES.containsKey(stateDescriptor.getClass())) {
+   String message = 
String.format("State %s is not supported by %s",
+   
stateDescriptor.getClass(), this.getClass());
+   throw new 
FlinkRuntimeException(message);
+   }
+   return STATE_FACTORIES
+   
.get(stateDescriptor.getClass())
--- End diff --

Maybe we can merge the `containsKey()` and the `get()` into a single 
`get()`, this way we don't need to query the `STATE_FACTORIES` twice.


---


[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903638
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1303,103 +1316,18 @@ private ColumnFamilyHandle 
createColumnFamily(String stateName) throws IOExcepti
}
 
@Override
-   protected  InternalValueState createValueState(
-   TypeSerializer namespaceSerializer,
-   ValueStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBValueState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   this);
-   }
-
-   @Override
-   protected  InternalListState createListState(
-   TypeSerializer namespaceSerializer,
-   ListStateDescriptor stateDesc) throws Exception {
-
-   Tuple2>> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBListState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getElementSerializer(),
-   this);
-   }
-
-   @Override
-   protected  InternalReducingState createReducingState(
-   TypeSerializer namespaceSerializer,
-   ReducingStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBReducingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getReduceFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalAggregatingState 
createAggregatingState(
-   TypeSerializer namespaceSerializer,
-   AggregatingStateDescriptor stateDesc) throws 
Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBAggregatingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getAggregateFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalFoldingState 
createFoldingState(
+   public  IS createState(
TypeSerializer namespaceSerializer,
-   FoldingStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBFoldingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getFoldFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalMapState createMapState(
-   TypeSerializer namespaceSerializer,
-   MapStateDescriptor stateDesc) throws Exception {
-
-   Tuple2>> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBMapState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903619
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1303,103 +1316,18 @@ private ColumnFamilyHandle 
createColumnFamily(String stateName) throws IOExcepti
}
 
@Override
-   protected  InternalValueState createValueState(
-   TypeSerializer namespaceSerializer,
-   ValueStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBValueState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   this);
-   }
-
-   @Override
-   protected  InternalListState createListState(
-   TypeSerializer namespaceSerializer,
-   ListStateDescriptor stateDesc) throws Exception {
-
-   Tuple2>> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBListState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getElementSerializer(),
-   this);
-   }
-
-   @Override
-   protected  InternalReducingState createReducingState(
-   TypeSerializer namespaceSerializer,
-   ReducingStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBReducingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getReduceFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalAggregatingState 
createAggregatingState(
-   TypeSerializer namespaceSerializer,
-   AggregatingStateDescriptor stateDesc) throws 
Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBAggregatingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getAggregateFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalFoldingState 
createFoldingState(
+   public  IS createState(
TypeSerializer namespaceSerializer,
-   FoldingStateDescriptor stateDesc) throws Exception {
-
-   Tuple2> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBFoldingState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getFoldFunction(),
-   this);
-   }
-
-   @Override
-   protected  InternalMapState createMapState(
-   TypeSerializer namespaceSerializer,
-   MapStateDescriptor stateDesc) throws Exception {
-
-   Tuple2>> registerResult =
-   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
-
-   return new RocksDBMapState<>(
-   registerResult.f0,
-   registerResult.f1.getNamespaceSerializer(),
-   registerResult.f1.getStateSerializer(),
-   stateDesc.getDefaultValue(),
-   

[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903979
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java ---
@@ -23,7 +23,10 @@
 /**
  * The {@code StateBinder} is used by {@link StateDescriptor} instances to 
create actual
  * {@link State} objects.
+ *
+ * @deprecated refactored to StateFactory in flink-runtime
  */
+@Deprecated
 @Internal
 public interface StateBinder {
--- End diff --

Since it is internal, how about just remove it?


---


[GitHub] flink pull request #6173: [FLINK-9571] Refactor StateBinder to internal back...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6173#discussion_r195903433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -203,91 +216,16 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  InternalValueState createValueState(
-   TypeSerializer namespaceSerializer,
-   ValueStateDescriptor stateDesc) throws Exception {
-
-   StateTable stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-   return new HeapValueState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue());
-   }
-
-   @Override
-   public  InternalListState createListState(
-   TypeSerializer namespaceSerializer,
-   ListStateDescriptor stateDesc) throws Exception {
-
-   StateTable> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-   return new HeapListState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue());
-   }
-
-   @Override
-   public  InternalReducingState createReducingState(
-   TypeSerializer namespaceSerializer,
-   ReducingStateDescriptor stateDesc) throws Exception {
-
-   StateTable stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-   return new HeapReducingState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getReduceFunction());
-   }
-
-   @Override
-   public  InternalAggregatingState 
createAggregatingState(
-   TypeSerializer namespaceSerializer,
-   AggregatingStateDescriptor stateDesc) throws 
Exception {
-
-   StateTable stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-   return new HeapAggregatingState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getAggregateFunction());
-   }
-
-   @Override
-   public  InternalFoldingState 
createFoldingState(
-   TypeSerializer namespaceSerializer,
-   FoldingStateDescriptor stateDesc) throws 
Exception {
-
-   StateTable stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-   return new HeapFoldingState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue(),
-   stateDesc.getFoldFunction());
-   }
-
-   @Override
-   protected  InternalMapState createMapState(
-   TypeSerializer namespaceSerializer,
-   MapStateDescriptor stateDesc) throws Exception {
-
-   StateTable> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
-
-   return new HeapMapState<>(
-   stateTable,
-   keySerializer,
-   stateTable.getStateSerializer(),
-   stateTable.getNamespaceSerializer(),
-   stateDesc.getDefaultValue());
+   public  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
+   String message = String.format("State %s is not 
supported by %s",
+   stateDesc.getClass(), this.getClass());
+   throw new FlinkRuntimeException(message);
   

[GitHub] flink issue #6174: [FLINNK-9601][state]Try to reuse the snapshotData array a...

2018-06-16 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6174
  
CC @StefanRRichter 


---


  1   2   3   4   5   6   >