flink git commit: [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

2017-05-17 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.3 98f4fad93 -> 8b5ba676f


[FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

This commit adds the config snapshot of the key serializer of keyed
backends to its checkpoints. This allows the oppurtunity to upgrade key
serializers, as well as state migration in the future in the case of
incompatible old and new key serializers.

This closes #3925.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b5ba676
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b5ba676
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b5ba676

Branch: refs/heads/release-1.3
Commit: 8b5ba676fb3b52f69b0a523d57264462ac73c23f
Parents: 98f4fad
Author: Tzu-Li (Gordon) Tai 
Authored: Wed May 17 01:15:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Wed May 17 22:52:54 2017 +0800

--
 .../state/RocksDBKeyedStateBackend.java | 30 ++
 .../state/KeyedBackendSerializationProxy.java   | 61 ++--
 .../state/heap/HeapKeyedStateBackend.java   | 15 +
 .../runtime/state/SerializationProxiesTest.java | 54 -
 .../runtime/state/StateBackendTestBase.java | 42 ++
 5 files changed, 197 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8b5ba676/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4bd94fd..ddc7e17 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
 * @throws ClassNotFoundException
 * @throws RocksDBException
 */
+   @SuppressWarnings("unchecked")
private void restoreKVStateMetaData() throws IOException, 
ClassNotFoundException, RocksDBException {
 
KeyedBackendSerializationProxy serializationProxy =
@@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
 
serializationProxy.read(currentStateHandleInView);
 
+   // check for key serializer compatibility; this also 
reconfigures the
+   // key serializer to be compatible, if it is required 
and is possible
+   if (StateMigrationUtil.resolveCompatibilityResult(
+   serializationProxy.getKeySerializer(),
+   
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+   
serializationProxy.getKeySerializerConfigSnapshot(),
+   (TypeSerializer) 
rocksDBKeyedStateBackend.keySerializer)
+   .isRequiresMigration()) {
+
+   // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
+   throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+   "Aborting now since state migration is 
currently not available");
+   }
+
List restoredMetaInfos =

serializationProxy.getStateMetaInfoSnapshots();
 
@@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
this.stateBackend = stateBackend;
}
 
+   @SuppressWarnings("unchecked")
private List readMetaData(
StreamStateHandle metaStateHandle) throws 
Exception {
 
@@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
 
+   // check for key serializer compatibility; 

flink git commit: [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

2017-05-17 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master 2bfead7d9 -> d8a467b01


[FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

This commit adds the config snapshot of the key serializer of keyed
backends to its checkpoints. This allows the oppurtunity to upgrade key
serializers, as well as state migration in the future in the case of
incompatible old and new key serializers.

This closes #3925.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a467b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a467b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a467b0

Branch: refs/heads/master
Commit: d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4
Parents: 2bfead7
Author: Tzu-Li (Gordon) Tai 
Authored: Wed May 17 01:15:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Wed May 17 22:46:51 2017 +0800

--
 .../state/RocksDBKeyedStateBackend.java | 30 ++
 .../state/KeyedBackendSerializationProxy.java   | 61 ++--
 .../state/heap/HeapKeyedStateBackend.java   | 15 +
 .../runtime/state/SerializationProxiesTest.java | 54 -
 .../runtime/state/StateBackendTestBase.java | 42 ++
 5 files changed, 197 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4bd94fd..ddc7e17 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
 * @throws ClassNotFoundException
 * @throws RocksDBException
 */
+   @SuppressWarnings("unchecked")
private void restoreKVStateMetaData() throws IOException, 
ClassNotFoundException, RocksDBException {
 
KeyedBackendSerializationProxy serializationProxy =
@@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
 
serializationProxy.read(currentStateHandleInView);
 
+   // check for key serializer compatibility; this also 
reconfigures the
+   // key serializer to be compatible, if it is required 
and is possible
+   if (StateMigrationUtil.resolveCompatibilityResult(
+   serializationProxy.getKeySerializer(),
+   
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+   
serializationProxy.getKeySerializerConfigSnapshot(),
+   (TypeSerializer) 
rocksDBKeyedStateBackend.keySerializer)
+   .isRequiresMigration()) {
+
+   // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
+   throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+   "Aborting now since state migration is 
currently not available");
+   }
+
List restoredMetaInfos =

serializationProxy.getStateMetaInfoSnapshots();
 
@@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
this.stateBackend = stateBackend;
}
 
+   @SuppressWarnings("unchecked")
private List readMetaData(
StreamStateHandle metaStateHandle) throws 
Exception {
 
@@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
 
+   // check for key serializer compatibility; this 

[1/3] flink git commit: [FLINK-6598] [table] Remove unused parameter from DataStreamGroupAggregate.

2017-05-17 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.3 7a045f204 -> 98f4fad93


[FLINK-6598] [table] Remove unused parameter from DataStreamGroupAggregate.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/056d9553
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/056d9553
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/056d9553

Branch: refs/heads/release-1.3
Commit: 056d9553d713834d18c1b3490a6e3f106129a9ef
Parents: 7a045f2
Author: sunjincheng121 
Authored: Tue May 16 19:08:11 2017 +0800
Committer: Fabian Hueske 
Committed: Wed May 17 15:29:40 2017 +0200

--
 .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala| 3 ---
 .../plan/rules/datastream/DataStreamGroupAggregateRule.scala  | 1 -
 2 files changed, 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/056d9553/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index e5d8088..d54c04b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory
   * @param traitSetTrait set of the RelNode
   * @param inputNode   The input RelNode of aggregation
   * @param namedAggregates List of calls to aggregate functions and their 
output field names
-  * @param rowRelDataType  The type of the rows of the RelNode
   * @param inputSchema The type of the rows consumed by this RelNode
   * @param schema  The type of the rows emitted by this RelNode
   * @param groupings   The position (in the input Row) of the grouping keys
@@ -53,7 +52,6 @@ class DataStreamGroupAggregate(
 traitSet: RelTraitSet,
 inputNode: RelNode,
 namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-rowRelDataType: RelDataType,
 schema: RowSchema,
 inputSchema: RowSchema,
 groupings: Array[Int])
@@ -79,7 +77,6 @@ class DataStreamGroupAggregate(
   traitSet,
   inputs.get(0),
   namedAggregates,
-  getRowType,
   schema,
   inputSchema,
   groupings)

http://git-wip-us.apache.org/repos/asf/flink/blob/056d9553/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
index fd7619c..0b8e411 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
@@ -68,7 +68,6 @@ class DataStreamGroupAggregateRule
   traitSet,
   convInput,
   agg.getNamedAggCalls,
-  rel.getRowType,
   new RowSchema(rel.getRowType),
   new RowSchema(agg.getInput.getRowType),
   agg.getGroupSet.toArray)



[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for counting GroupWindows.

2017-05-17 Thread fhueske
[FLINK-6583] [table] Add state cleanup for counting GroupWindows.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36cac0fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36cac0fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36cac0fb

Branch: refs/heads/release-1.3
Commit: 36cac0fb64ade6debc66f06e00d4184cdc0fba5f
Parents: 056d955
Author: sunjincheng121 
Authored: Tue May 16 11:58:37 2017 +0800
Committer: Fabian Hueske 
Committed: Wed May 17 15:29:43 2017 +0200

--
 .../DataStreamGroupWindowAggregate.scala|  32 +++-
 .../triggers/StateCleaningCountTrigger.scala| 136 +
 .../table/GroupWindowAggregationsITCase.scala   |  10 +-
 .../StateCleaningCountTriggerHarnessTest.scala  | 147 +++
 4 files changed, 316 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/36cac0fb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c158579..1ac013a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window 
=> DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -40,7 +41,8 @@ import 
org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.slf4j.LoggerFactory
 
 class DataStreamGroupWindowAggregate(
 window: LogicalWindow,
@@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate(
 grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with 
DataStreamRel {
 
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
   override def deriveRowType(): RelDataType = schema.logicalType
 
   override def needsUpdatesAsRetraction = true
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
+"No state retention interval configured for a query which accumulates 
state. " +
+"Please provide a query configuration with valid retention interval to 
prevent excessive " +
+"state size. You may specify a retention time of 0 to not clean up the 
state.")
+}
+
 val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
 
 val aggString = aggregationToString(
@@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate(
 
   val keyedStream = inputDS.keyBy(physicalGrouping: _*)
   val windowedStream =
-createKeyedWindowedStream(window, keyedStream)
+createKeyedWindowedStream(queryConfig, window, keyedStream)
   .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
   val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate(
 physicalNamedProperties)
 
   val windowedStream =
-

[3/3] flink git commit: [FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth.

2017-05-17 Thread fhueske
[FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent 
growth.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98f4fad9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98f4fad9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98f4fad9

Branch: refs/heads/release-1.3
Commit: 98f4fad93263b05f0e55562ddd81385430546225
Parents: 36cac0f
Author: Fabian Hueske 
Authored: Mon May 15 21:41:51 2017 +0200
Committer: Fabian Hueske 
Committed: Wed May 17 15:29:46 2017 +0200

--
 .../apache/flink/api/common/typeutils/base/ListSerializer.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/98f4fad9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index aa9808e..1f271fe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -82,7 +82,7 @@ public final class ListSerializer extends 
TypeSerializer {
@Override
public TypeSerializer duplicate() {
TypeSerializer duplicateElement = 
elementSerializer.duplicate();
-   return duplicateElement == elementSerializer ? this : new 
ListSerializer(duplicateElement);
+   return duplicateElement == elementSerializer ? this : new 
ListSerializer<>(duplicateElement);
}
 
@Override
@@ -129,7 +129,8 @@ public final class ListSerializer extends 
TypeSerializer {
@Override
public List deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
-   final List list = new ArrayList<>(size);
+   // create new list with (size + 1) capacity to prevent 
expensive growth when a single element is added
+   final List list = new ArrayList<>(size + 1);
for (int i = 0; i < size; i++) {
list.add(elementSerializer.deserialize(source));
}



[1/3] flink git commit: [FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth.

2017-05-17 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master b6afc06ab -> 2bfead7d9


[FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent 
growth.

This closes #3912.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bfead7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bfead7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bfead7d

Branch: refs/heads/master
Commit: 2bfead7d9bef51713ed203fa7979f71f23525733
Parents: d85d969
Author: Fabian Hueske 
Authored: Mon May 15 21:41:51 2017 +0200
Committer: Fabian Hueske 
Committed: Wed May 17 15:24:23 2017 +0200

--
 .../apache/flink/api/common/typeutils/base/ListSerializer.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2bfead7d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index aa9808e..1f271fe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -82,7 +82,7 @@ public final class ListSerializer extends 
TypeSerializer {
@Override
public TypeSerializer duplicate() {
TypeSerializer duplicateElement = 
elementSerializer.duplicate();
-   return duplicateElement == elementSerializer ? this : new 
ListSerializer(duplicateElement);
+   return duplicateElement == elementSerializer ? this : new 
ListSerializer<>(duplicateElement);
}
 
@Override
@@ -129,7 +129,8 @@ public final class ListSerializer extends 
TypeSerializer {
@Override
public List deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
-   final List list = new ArrayList<>(size);
+   // create new list with (size + 1) capacity to prevent 
expensive growth when a single element is added
+   final List list = new ArrayList<>(size + 1);
for (int i = 0; i < size; i++) {
list.add(elementSerializer.deserialize(source));
}



[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for counting GroupWindows.

2017-05-17 Thread fhueske
[FLINK-6583] [table] Add state cleanup for counting GroupWindows.

This closes #3919.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d85d9693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d85d9693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d85d9693

Branch: refs/heads/master
Commit: d85d969334e89d83aec60f9bb3d2c69a4701eb54
Parents: 64d3ce8
Author: sunjincheng121 
Authored: Tue May 16 11:58:37 2017 +0800
Committer: Fabian Hueske 
Committed: Wed May 17 15:24:23 2017 +0200

--
 .../DataStreamGroupWindowAggregate.scala|  32 +++-
 .../triggers/StateCleaningCountTrigger.scala| 136 +
 .../table/GroupWindowAggregationsITCase.scala   |  10 +-
 .../StateCleaningCountTriggerHarnessTest.scala  | 147 +++
 4 files changed, 316 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index c158579..1ac013a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
+import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window 
=> DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
@@ -40,7 +41,8 @@ import 
org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
+import org.slf4j.LoggerFactory
 
 class DataStreamGroupWindowAggregate(
 window: LogicalWindow,
@@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate(
 grouping: Array[Int])
   extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with 
DataStreamRel {
 
+  private val LOG = LoggerFactory.getLogger(this.getClass)
+
   override def deriveRowType(): RelDataType = schema.logicalType
 
   override def needsUpdatesAsRetraction = true
@@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate(
   "non-windowed GroupBy aggregation.")
 }
 
+val isCountWindow = window match {
+  case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true
+  case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true
+  case _ => false
+}
+
+if (isCountWindow && grouping.length > 0 && 
queryConfig.getMinIdleStateRetentionTime < 0) {
+  LOG.warn(
+"No state retention interval configured for a query which accumulates 
state. " +
+"Please provide a query configuration with valid retention interval to 
prevent excessive " +
+"state size. You may specify a retention time of 0 to not clean up the 
state.")
+}
+
 val outRowType = CRowTypeInfo(schema.physicalTypeInfo)
 
 val aggString = aggregationToString(
@@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate(
 
   val keyedStream = inputDS.keyBy(physicalGrouping: _*)
   val windowedStream =
-createKeyedWindowedStream(window, keyedStream)
+createKeyedWindowedStream(queryConfig, window, keyedStream)
   .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
 
   val (aggFunction, accumulatorRowType, aggResultRowType) =
@@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate(
 physicalNamedProperties)
 
   val windowedStream =
-

flink git commit: [hotfix] Add configuration notice to HistryServer overview

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.3 849dd9d85 -> 7a045f204


[hotfix] Add configuration notice to HistryServer overview


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a045f20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a045f20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a045f20

Branch: refs/heads/release-1.3
Commit: 7a045f20432cbaf1dcfa70206004fbee3a91d875
Parents: 849dd9d
Author: twalthr 
Authored: Wed May 17 15:10:21 2017 +0200
Committer: twalthr 
Committed: Wed May 17 15:11:44 2017 +0200

--
 docs/monitoring/historyserver.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7a045f20/docs/monitoring/historyserver.md
--
diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md
index fd957f2..f5f0019 100644
--- a/docs/monitoring/historyserver.md
+++ b/docs/monitoring/historyserver.md
@@ -33,7 +33,7 @@ Furthermore, it exposes a REST API that accepts HTTP requests 
and responds with
 
 The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
 
-You start and stop the HistoryServer via its corresponding startup script:
+After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
 
 ```sh
 # Start or stop the HistoryServer



flink git commit: [hotfix] Add configuration notice to HistryServer overview

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 00ce3f1b1 -> b6afc06ab


[hotfix] Add configuration notice to HistryServer overview


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6afc06a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6afc06a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6afc06a

Branch: refs/heads/master
Commit: b6afc06ab1b5c227254941a512912f830da62b5d
Parents: 00ce3f1
Author: twalthr 
Authored: Wed May 17 15:10:21 2017 +0200
Committer: twalthr 
Committed: Wed May 17 15:10:21 2017 +0200

--
 docs/monitoring/historyserver.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b6afc06a/docs/monitoring/historyserver.md
--
diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md
index fd957f2..f5f0019 100644
--- a/docs/monitoring/historyserver.md
+++ b/docs/monitoring/historyserver.md
@@ -33,7 +33,7 @@ Furthermore, it exposes a REST API that accepts HTTP requests 
and responds with
 
 The HistoryServer allows you to query the status and statistics of completed 
jobs that have been archived by a JobManager.
 
-You start and stop the HistoryServer via its corresponding startup script:
+After you have configured the HistoryServer *and* JobManager, you start and 
stop the HistoryServer via its corresponding startup script:
 
 ```sh
 # Start or stop the HistoryServer



[2/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.

2017-05-17 Thread kkloudas
[FLINK-6371] [cep] NFA return matched patterns as Map.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa64a60f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa64a60f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa64a60f

Branch: refs/heads/release-1.3
Commit: fa64a60ff9229cd1c7723d95b8a1bf1a1eb2bd63
Parents: fe1316b
Author: kl0u 
Authored: Fri May 5 13:55:07 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:23 2017 +0200

--
 .../org/apache/flink/cep/CEPLambdaTest.java |  11 +-
 .../apache/flink/cep/scala/PatternStream.scala  |  31 +-
 ...StreamScalaJavaAPIInteroperabilityTest.scala |  33 +-
 .../flink/cep/PatternFlatSelectFunction.java|   3 +-
 .../flink/cep/PatternFlatTimeoutFunction.java   |   3 +-
 .../apache/flink/cep/PatternSelectFunction.java |   3 +-
 .../org/apache/flink/cep/PatternStream.java |  29 +-
 .../flink/cep/PatternTimeoutFunction.java   |   3 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 109 +---
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  10 +-
 .../flink/cep/operator/CEPOperatorUtils.java|  19 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  17 +-
 .../TimeoutKeyedCEPPatternOperator.java |  23 +-
 .../java/org/apache/flink/cep/CEPITCase.java|  69 +--
 .../org/apache/flink/cep/nfa/NFAITCase.java | 608 +++
 .../java/org/apache/flink/cep/nfa/NFATest.java  |  62 +-
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  17 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   1 -
 .../cep/operator/CEPFrom12MigrationTest.java|  57 +-
 .../cep/operator/CEPMigration11to13Test.java|  21 +-
 .../flink/cep/operator/CEPOperatorTest.java |  41 +-
 .../flink/cep/operator/CEPRescalingTest.java|  31 +-
 22 files changed, 474 insertions(+), 727 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
--
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java 
b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 5957158..03fb3c6 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -24,10 +24,13 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.*;
@@ -41,6 +44,7 @@ public class CEPLambdaTest extends TestLogger {
 * Tests that a Java8 lambda can be passed as a CEP select function
 */
@Test
+   @Ignore
public void testLambdaSelectFunction() {
TypeInformation eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
TypeInformation outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
@@ -59,16 +63,17 @@ public class CEPLambdaTest extends TestLogger {
PatternStream patternStream = new 
PatternStream<>(inputStream, dummyPattern);
 
DataStream result = patternStream.select(
-   map -> new EventB()
+   (Map map) -> new EventB()
);
 
assertEquals(outputTypeInformation, result.getType());
}
 
/**
-* Tests that a Java8 labmda can be passed as a CEP flat select function
+* Tests that a Java8 lambda can be passed as a CEP flat select function
 */
@Test
+   @Ignore
public void testLambdaFlatSelectFunction() {
TypeInformation eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
TypeInformation outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
@@ -86,7 +91,7 @@ public class CEPLambdaTest extends TestLogger {
PatternStream patternStream = new 
PatternStream<>(inputStream, dummyPattern);
 
DataStream result = patternStream.flatSelect(
-   (map, collector) -> collector.collect(new EventB())
+   (Map map, Collector 
collector) -> collector.collect(new EventB())
);
 
assertEquals(outputTypeInformation, result.getType());


[6/9] flink git commit: [FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.

2017-05-17 Thread kkloudas
[FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36df9019
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36df9019
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36df9019

Branch: refs/heads/release-1.3
Commit: 36df90196fcbb713175466d4b31574d570890262
Parents: 4560d56
Author: kkloudas 
Authored: Mon May 15 14:33:09 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:26 2017 +0200

--
 .../apache/flink/cep/nfa/ComputationState.java  |  15 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  34 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 117 +++---
 .../java/org/apache/flink/cep/nfa/State.java|   6 +-
 .../apache/flink/cep/nfa/StateTransition.java   |  21 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java | 364 +++
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  78 ++--
 .../cep/operator/CEPFrom12MigrationTest.java|   3 +
 .../cep/operator/CEPMigration11to13Test.java|   3 +
 .../flink/cep/operator/CEPOperatorTest.java | 201 ++
 10 files changed, 728 insertions(+), 114 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/36df9019/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 08b9b78..44f8f39 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -40,6 +40,8 @@ public class ComputationState {
// the last taken event
private final T event;
 
+   private final int counter;
+
// timestamp of the last taken event
private final long timestamp;
 
@@ -58,11 +60,13 @@ public class ComputationState {
final State currentState,
final State previousState,
final T event,
+   final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
this.state = currentState;
this.event = event;
+   this.counter = counter;
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
@@ -70,6 +74,10 @@ public class ComputationState {
this.conditionContext = new ConditionContext(nfa, this);
}
 
+   public int getCounter() {
+   return counter;
+   }
+
public ConditionContext getConditionContext() {
return conditionContext;
}
@@ -108,12 +116,12 @@ public class ComputationState {
 
public static  ComputationState createStartState(final NFA 
nfa, final State state) {
Preconditions.checkArgument(state.isStart());
-   return new ComputationState<>(nfa, state, null, null, -1L, new 
DeweyNumber(1), -1L);
+   return new ComputationState<>(nfa, state, null, null, 0, -1L, 
new DeweyNumber(1), -1L);
}
 
public static  ComputationState createStartState(final NFA 
nfa, final State state, final DeweyNumber version) {
Preconditions.checkArgument(state.isStart());
-   return new ComputationState<>(nfa, state, null, null, -1L, 
version, -1L);
+   return new ComputationState<>(nfa, state, null, null, 0, -1L, 
version, -1L);
}
 
public static  ComputationState createState(
@@ -121,10 +129,11 @@ public class ComputationState {
final State currentState,
final State previousState,
final T event,
+   final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
-   return new ComputationState<>(nfa, currentState, previousState, 
event, timestamp, version, startTimestamp);
+   return new ComputationState<>(nfa, currentState, previousState, 
event, counter, timestamp, version, startTimestamp);
}
 
public boolean isStopState() {

http://git-wip-us.apache.org/repos/asf/flink/blob/36df9019/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 

[4/9] flink git commit: [FLINK-6255] [cep] Remove PatternStream.getSideOutput().

2017-05-17 Thread kkloudas
[FLINK-6255] [cep] Remove PatternStream.getSideOutput().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4560d56c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4560d56c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4560d56c

Branch: refs/heads/release-1.3
Commit: 4560d56c6fdface683116dc2db5d7a4942a8b6e3
Parents: 4f14e53
Author: kl0u 
Authored: Fri May 12 16:01:38 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:25 2017 +0200

--
 docs/dev/libs/cep.md| 41 +---
 .../apache/flink/cep/scala/PatternStream.scala  | 35 +--
 .../org/apache/flink/cep/PatternStream.java | 55 +--
 .../AbstractKeyedCEPPatternOperator.java| 24 -
 .../flink/cep/operator/CEPOperatorUtils.java|  9 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  4 +-
 .../TimeoutKeyedCEPPatternOperator.java |  4 +-
 .../java/org/apache/flink/cep/CEPITCase.java| 98 +---
 .../cep/operator/CEPFrom12MigrationTest.java|  6 --
 .../cep/operator/CEPMigration11to13Test.java|  2 -
 .../flink/cep/operator/CEPOperatorTest.java |  2 -
 .../flink/cep/operator/CEPRescalingTest.java|  1 -
 12 files changed, 13 insertions(+), 268 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/docs/dev/libs/cep.md
--
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index b379615..58e1a0a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -806,46 +806,7 @@ in event time.
 
 To also guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes 
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last 
-seen watermark. Late elements are not further processed but they can be 
redirected to a [side output]
-({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.
-
-To access the stream of late elements, you first need to specify that you want 
to get the late data using 
-`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the 
`CEP.pattern(...)` call. If you do not do
-so, the late elements will be silently dropped. Then, you can get the 
side-output stream using the 
-`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and 
providing as argument the output tag used in 
-the `.sideOutputLateData(OutputTag)`:
-
-
-
-{% highlight java %}
-final OutputTag lateOutputTag = new OutputTag("late-data"){};
-
-PatternStream patternStream = CEP.pattern(...)
-.sideOutputLateData(lateOutputTag);
-
-// main output with matches
-DataStream result = patternStream.select(...)
-
-// side output containing the late events
-DataStream lateStream = patternStream.getSideOutput(lateOutputTag);
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-val lateOutputTag = OutputTag[T]("late-data")
-
-val patternStream: PatternStream[T] = CEP.pattern(...)
-.sideOutputLateData(lateOutputTag)
-
-// main output with matches
-val result = patternStream.select(...)
-
-// side output containing the late events
-val lateStream = patternStream.getSideOutput(lateOutputTag)
-{% endhighlight %}
-
-
+seen watermark. Late elements are not further processed.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
--
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d4bc28c..e71439c 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
-import org.apache.flink.util.{Collector, OutputTag}
+import org.apache.flink.util.Collector
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
-import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 

[9/9] flink git commit: [FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.

2017-05-17 Thread kkloudas
[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849dd9d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849dd9d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849dd9d8

Branch: refs/heads/release-1.3
Commit: 849dd9d8588ec8b1971c2c2b3f0a07f87715741b
Parents: d80af81
Author: Dawid Wysakowicz 
Authored: Wed May 17 09:16:08 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:28 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++-
 .../org/apache/flink/cep/nfa/NFAITCase.java | 82 
 2 files changed, 89 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/849dd9d8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ab5cd8e..a977a7f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -418,6 +418,7 @@ public class NFA implements Serializable {
final List edges = outgoingEdges.getEdges();
int takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
int ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   int totalTakeToSkip = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
 
final List resultingComputationStates = 
new ArrayList<>();
for (StateTransition edge : edges) {
@@ -433,7 +434,9 @@ public class NFA implements Serializable {
version = 
computationState.getVersion().increase(toIncrease);
} else {
//IGNORE after PROCEED
-   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   version = 
computationState.getVersion()
+   
.increase(totalTakeToSkip + ignoreBranchesToVisit)
+   .addStage();
ignoreBranchesToVisit--;
}
 
@@ -457,8 +460,8 @@ public class NFA implements Serializable {
 
final T previousEvent = 
computationState.getEvent();
 
-   final DeweyNumber currentVersion = 
computationState.getVersion();
-   final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+   final DeweyNumber currentVersion = 
computationState.getVersion().increase(takeBranchesToVisit);
+   final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage();
takeBranchesToVisit--;
 
final int counter;
@@ -573,7 +576,7 @@ public class NFA implements Serializable {
}
 
private int calculateIncreasingSelfState(int ignoreBranches, int 
takeBranches) {
-   return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + 1;
+   return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + Math.max(1, takeBranches);
}
 
private OutgoingEdges createDecisionGraph(ComputationState 
computationState, T event) {

http://git-wip-us.apache.org/repos/asf/flink/blob/849dd9d8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 012e112..d00bbb7 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger {
}
 
@Test
+   public void testMultipleTakesVersionCollision() {
+   List inputEvents 

[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 88a5703..824df2d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -32,9 +32,9 @@ import 
org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.net.URL;
@@ -57,7 +57,6 @@ public class CEPMigration11to13Test {
}
 
@Test
-   @Ignore
public void testKeyedCEPOperatorMigratation() throws Exception {
 
KeySelector keySelector = new 
KeySelector() {
@@ -136,11 +135,58 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
 
+   // and now go for a checkpoint with the new serializers
+
+   final Event startEvent1 = new Event(42, "start", 2.0);
+   final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 
11.0);
+   final Event endEvent1 = new Event(42, "end", 2.0);
+
+   harness.processElement(new StreamRecord(startEvent1, 
21));
+   harness.processElement(new StreamRecord(middleEvent1, 
23));
+
+   // simulate snapshot/restore with some elements in internal 
sorting queue
+   OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+   harness.close();
+
+   harness = new KeyedOneInputStreamOperatorTestHarness<>(
+   new KeyedCEPPatternOperator<>(
+   
Event.createTypeSerializer(),
+   false,
+   
IntSerializer.INSTANCE,
+   new 
NFAFactory(),
+   true),
+   keySelector,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   harness.setup();
+   harness.initializeState(snapshot);
+   harness.open();
+
+   harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+   harness.processWatermark(new Watermark(50));
+
+   result = harness.getOutput();
+
+   // watermark and the result
+   assertEquals(2, result.size());
+
+   Object resultObject1 = result.poll();
+   assertTrue(resultObject1 instanceof StreamRecord);
+   StreamRecord resultRecord1 = (StreamRecord) resultObject1;
+   assertTrue(resultRecord1.getValue() instanceof Map);
+
+   @SuppressWarnings("unchecked")
+   Map patternMap1 = (Map) resultRecord1.getValue();
+
+   assertEquals(startEvent1, patternMap1.get("start").get(0));
+   assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+   assertEquals(endEvent1, patternMap1.get("end").get(0));
+
harness.close();
}
 
@Test
-   @Ignore
public void testNonKeyedCEPFunctionMigration() throws Exception {
 
final Event startEvent = new Event(42, "start", 1.0);
@@ -191,7 +237,7 @@ public class CEPMigration11to13Test {
harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5));
 
-   harness.processWatermark(new Watermark(Long.MAX_VALUE));
+   harness.processWatermark(new Watermark(20));
 
ConcurrentLinkedQueue result = harness.getOutput();
 
@@ -210,6 +256,54 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
 
+   

[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
[FLINK-6604] [cep] Remove java serialization from the library.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80af819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80af819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80af819

Branch: refs/heads/release-1.3
Commit: d80af81972ba5adf291d891881aee26b97ec7a60
Parents: 34a6020
Author: kkloudas 
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:27 2017 +0200

--
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++-
 .../java/org/apache/flink/cep/nfa/State.java|   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java|  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 --
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java|  99 ++-
 .../cep/operator/CEPMigration11to13Test.java| 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
deweyNumber = new int[]{start};
}
 
-   protected DeweyNumber(int[] deweyNumber) {
-   this.deweyNumber = deweyNumber;
-   }
-
public DeweyNumber(DeweyNumber number) {
this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
}
 
+   private DeweyNumber(int[] deweyNumber) {
+   this.deweyNumber = deweyNumber;
+   }
+
/**
 * Checks whether this dewey number is compatible to the other dewey 
number.
 *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
return new DeweyNumber(deweyNumber);
}
}
+
+   /**
+* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as 
a version number.
+*/
+   public static class DeweyNumberSerializer extends 
TypeSerializerSingleton {
+
+   private static final long serialVersionUID = 
-5086792497034943656L;
+
+   private final IntSerializer elemSerializer = 
IntSerializer.INSTANCE;
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public DeweyNumber createInstance() {
+   return new DeweyNumber(1);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from) {
+   return new DeweyNumber(from);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+   return copy(from);
+   }
+
+   @Override
+   public int getLength() {
+   return -1;
+   }
+
+   @Override
+   public void serialize(DeweyNumber record, DataOutputView 
target) throws IOException {
+   final int size = record.length();
+   target.writeInt(size);
+   for (int i = 0; i < size; i++) {
+   elemSerializer.serialize(record.deweyNumber[i], 
target);
+   }
+   }
+
+   @Override
+   public DeweyNumber deserialize(DataInputView source) throws 
IOException {
+   final 

[1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.

2017-05-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/release-1.3 fe1316b33 -> 849dd9d85


http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2cc67e5..46e2fd4 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
@@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   List> resultingPatterns = new ArrayList<>();
-
-   for (StreamRecord inputEvent: inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   resultingPatterns.addAll(patterns);
-   }
-
-   assertEquals(1, resultingPatterns.size());
-   Map patternMap = resultingPatterns.get(0);
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(startEvent, patternMap.get("start"));
-   assertEquals(middleEvent, patternMap.get("middle"));
-   assertEquals(endEvent, patternMap.get("end"));
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(startEvent, middleEvent, 
endEvent)
+   ));
}
 
@Test
@@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   Set resultingPatterns = new HashSet<>();
-   List allPatterns = new ArrayList<>();
-
-   for (StreamRecord inputEvent : inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   for (Map foundPattern : patterns) {
-   resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-   allPatterns.add(foundPattern.values());
-   }
-   }
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(1, allPatterns.size());
-   assertEquals(Sets.newHashSet(
-   Sets.newHashSet(middleEvent1, end)
-   ), resultingPatterns);
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(middleEvent1, end)
+   ));
}
 
@Test
@@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   Set resultingPatterns = new HashSet<>();
-
-   for (StreamRecord inputEvent : inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   for (Map foundPattern : patterns) {
-   resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-   }
-   }
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(Sets.newHashSet(), resultingPatterns);
+   compareMaps(resultingPatterns, 
Lists.newArrayList());
}
 
/**
@@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger {
@Test
public void testSimplePatternWithTimeWindowNFA() {
List events = new ArrayList<>();
-   List> resultingPatterns = new ArrayList<>();
 
final Event startEvent;
final Event middleEvent;
@@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   for (StreamRecord event: events) {
-   

[3/9] flink git commit: [FLINK-6536] [cep] Improve error message in SharedBuffer::put().

2017-05-17 Thread kkloudas
[FLINK-6536] [cep] Improve error message in SharedBuffer::put().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f14e53b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f14e53b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f14e53b

Branch: refs/heads/release-1.3
Commit: 4f14e53b877eab204d4d970d29f886d8fcc0034b
Parents: fa64a60
Author: kl0u 
Authored: Thu May 11 11:39:00 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:24 2017 +0200

--
 .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4f14e53b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 418bd4a..decf577 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -97,10 +97,11 @@ public class SharedBuffer 
implements Serializable {
 
// sanity check whether we've found the previous element
if (previousSharedBufferEntry == null && previousValue != null) 
{
-   throw new IllegalStateException("Could not find 
previous shared buffer entry with " +
+   throw new IllegalStateException("Could not find 
previous entry with " +
"key: " + previousKey + ", value: " + 
previousValue + " and timestamp: " +
-   previousTimestamp + ". This can indicate that 
the element belonging to the previous " +
-   "relation has been already pruned, even though 
you expect it to be still there.");
+   previousTimestamp + ". This can indicate that 
either you did not implement " +
+   "the equals() and hashCode() methods of your 
input elements properly or that " +
+   "the element belonging to that entry has been 
already pruned.");
}
 
put(key, value, timestamp, previousSharedBufferEntry, version);



[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in operator.

2017-05-17 Thread kkloudas
[hotfix] [cep] Remove unused keySelector in operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a6020f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a6020f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a6020f

Branch: refs/heads/release-1.3
Commit: 34a6020ff6d51cd8148366677fadb9317cfaa153
Parents: 36df901
Author: kkloudas 
Authored: Mon May 15 14:49:00 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:26 2017 +0200

--
 .../cep/operator/AbstractKeyedCEPPatternOperator.java   |  6 --
 .../org/apache/flink/cep/operator/CEPOperatorUtils.java |  6 --
 .../flink/cep/operator/KeyedCEPPatternOperator.java |  4 +---
 .../cep/operator/TimeoutKeyedCEPPatternOperator.java|  4 +---
 .../flink/cep/operator/CEPFrom12MigrationTest.java  |  6 --
 .../flink/cep/operator/CEPMigration11to13Test.java  |  2 --
 .../org/apache/flink/cep/operator/CEPOperatorTest.java  | 12 
 .../org/apache/flink/cep/operator/CEPRescalingTest.java |  1 -
 8 files changed, 6 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7068bc4..bac21b3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator
 
private final TypeSerializer inputSerializer;
 
-   // necessary to extract the key from the input elements
-   private final KeySelector keySelector;
-
// necessary to serialize the set of seen keys
private final TypeSerializer keySerializer;
 
@@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator
public AbstractKeyedCEPPatternOperator(
final TypeSerializer inputSerializer,
final boolean isProcessingTime,
-   final KeySelector keySelector,
final TypeSerializer keySerializer,
final NFACompiler.NFAFactory nfaFactory,
final boolean migratingFromOldKeyedOperator) {
 
this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
-   this.keySelector = Preconditions.checkNotNull(keySelector);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 08424a4..e7b7e65 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -63,7 +63,6 @@ public class CEPOperatorUtils {
// We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
KeyedStream keyedStream= (KeyedStream) 
inputStream;
 
-   KeySelector keySelector = 
keyedStream.getKeySelector();
TypeSerializer keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
  

[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
[FLINK-6604] [cep] Remove java serialization from the library.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e

Branch: refs/heads/master
Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47
Parents: f7ebcb0
Author: kkloudas 
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:34 2017 +0200

--
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++-
 .../java/org/apache/flink/cep/nfa/State.java|   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java|  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 --
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java|  99 ++-
 .../cep/operator/CEPMigration11to13Test.java| 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
deweyNumber = new int[]{start};
}
 
-   protected DeweyNumber(int[] deweyNumber) {
-   this.deweyNumber = deweyNumber;
-   }
-
public DeweyNumber(DeweyNumber number) {
this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
}
 
+   private DeweyNumber(int[] deweyNumber) {
+   this.deweyNumber = deweyNumber;
+   }
+
/**
 * Checks whether this dewey number is compatible to the other dewey 
number.
 *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
return new DeweyNumber(deweyNumber);
}
}
+
+   /**
+* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as 
a version number.
+*/
+   public static class DeweyNumberSerializer extends 
TypeSerializerSingleton {
+
+   private static final long serialVersionUID = 
-5086792497034943656L;
+
+   private final IntSerializer elemSerializer = 
IntSerializer.INSTANCE;
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public DeweyNumber createInstance() {
+   return new DeweyNumber(1);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from) {
+   return new DeweyNumber(from);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+   return copy(from);
+   }
+
+   @Override
+   public int getLength() {
+   return -1;
+   }
+
+   @Override
+   public void serialize(DeweyNumber record, DataOutputView 
target) throws IOException {
+   final int size = record.length();
+   target.writeInt(size);
+   for (int i = 0; i < size; i++) {
+   elemSerializer.serialize(record.deweyNumber[i], 
target);
+   }
+   }
+
+   @Override
+   public DeweyNumber deserialize(DataInputView source) throws 
IOException {
+   final int 

[9/9] flink git commit: [FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.

2017-05-17 Thread kkloudas
[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00ce3f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00ce3f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00ce3f1b

Branch: refs/heads/master
Commit: 00ce3f1b12c7d7bf996d5f91bf006f0e18a719e7
Parents: 7a54d05
Author: Dawid Wysakowicz 
Authored: Wed May 17 09:16:08 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:35 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++-
 .../org/apache/flink/cep/nfa/NFAITCase.java | 82 
 2 files changed, 89 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index ab5cd8e..a977a7f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -418,6 +418,7 @@ public class NFA implements Serializable {
final List edges = outgoingEdges.getEdges();
int takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
int ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
+   int totalTakeToSkip = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
 
final List resultingComputationStates = 
new ArrayList<>();
for (StateTransition edge : edges) {
@@ -433,7 +434,9 @@ public class NFA implements Serializable {
version = 
computationState.getVersion().increase(toIncrease);
} else {
//IGNORE after PROCEED
-   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+   version = 
computationState.getVersion()
+   
.increase(totalTakeToSkip + ignoreBranchesToVisit)
+   .addStage();
ignoreBranchesToVisit--;
}
 
@@ -457,8 +460,8 @@ public class NFA implements Serializable {
 
final T previousEvent = 
computationState.getEvent();
 
-   final DeweyNumber currentVersion = 
computationState.getVersion();
-   final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+   final DeweyNumber currentVersion = 
computationState.getVersion().increase(takeBranchesToVisit);
+   final DeweyNumber nextVersion = new 
DeweyNumber(currentVersion).addStage();
takeBranchesToVisit--;
 
final int counter;
@@ -573,7 +576,7 @@ public class NFA implements Serializable {
}
 
private int calculateIncreasingSelfState(int ignoreBranches, int 
takeBranches) {
-   return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + 1;
+   return takeBranches == 0 && ignoreBranches == 0 ? 0 : 
ignoreBranches + Math.max(1, takeBranches);
}
 
private OutgoingEdges createDecisionGraph(ComputationState 
computationState, T event) {

http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 012e112..d00bbb7 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger {
}
 
@Test
+   public void testMultipleTakesVersionCollision() {
+   List inputEvents = 

[4/9] flink git commit: [FLINK-6536] [cep] Improve error message in SharedBuffer::put().

2017-05-17 Thread kkloudas
[FLINK-6536] [cep] Improve error message in SharedBuffer::put().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02ea418f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02ea418f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02ea418f

Branch: refs/heads/master
Commit: 02ea418fb307876e1b957cad6be619a4d035d829
Parents: ae9c9d0
Author: kl0u 
Authored: Thu May 11 11:39:00 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:32 2017 +0200

--
 .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/02ea418f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 418bd4a..decf577 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -97,10 +97,11 @@ public class SharedBuffer 
implements Serializable {
 
// sanity check whether we've found the previous element
if (previousSharedBufferEntry == null && previousValue != null) 
{
-   throw new IllegalStateException("Could not find 
previous shared buffer entry with " +
+   throw new IllegalStateException("Could not find 
previous entry with " +
"key: " + previousKey + ", value: " + 
previousValue + " and timestamp: " +
-   previousTimestamp + ". This can indicate that 
the element belonging to the previous " +
-   "relation has been already pruned, even though 
you expect it to be still there.");
+   previousTimestamp + ". This can indicate that 
either you did not implement " +
+   "the equals() and hashCode() methods of your 
input elements properly or that " +
+   "the element belonging to that entry has been 
already pruned.");
}
 
put(key, value, timestamp, previousSharedBufferEntry, version);



[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 88a5703..824df2d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -32,9 +32,9 @@ import 
org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.net.URL;
@@ -57,7 +57,6 @@ public class CEPMigration11to13Test {
}
 
@Test
-   @Ignore
public void testKeyedCEPOperatorMigratation() throws Exception {
 
KeySelector keySelector = new 
KeySelector() {
@@ -136,11 +135,58 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
 
+   // and now go for a checkpoint with the new serializers
+
+   final Event startEvent1 = new Event(42, "start", 2.0);
+   final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 
11.0);
+   final Event endEvent1 = new Event(42, "end", 2.0);
+
+   harness.processElement(new StreamRecord(startEvent1, 
21));
+   harness.processElement(new StreamRecord(middleEvent1, 
23));
+
+   // simulate snapshot/restore with some elements in internal 
sorting queue
+   OperatorStateHandles snapshot = harness.snapshot(1L, 1L);
+   harness.close();
+
+   harness = new KeyedOneInputStreamOperatorTestHarness<>(
+   new KeyedCEPPatternOperator<>(
+   
Event.createTypeSerializer(),
+   false,
+   
IntSerializer.INSTANCE,
+   new 
NFAFactory(),
+   true),
+   keySelector,
+   BasicTypeInfo.INT_TYPE_INFO);
+
+   harness.setup();
+   harness.initializeState(snapshot);
+   harness.open();
+
+   harness.processElement(new StreamRecord<>(endEvent1, 25));
+
+   harness.processWatermark(new Watermark(50));
+
+   result = harness.getOutput();
+
+   // watermark and the result
+   assertEquals(2, result.size());
+
+   Object resultObject1 = result.poll();
+   assertTrue(resultObject1 instanceof StreamRecord);
+   StreamRecord resultRecord1 = (StreamRecord) resultObject1;
+   assertTrue(resultRecord1.getValue() instanceof Map);
+
+   @SuppressWarnings("unchecked")
+   Map patternMap1 = (Map) resultRecord1.getValue();
+
+   assertEquals(startEvent1, patternMap1.get("start").get(0));
+   assertEquals(middleEvent1, patternMap1.get("middle").get(0));
+   assertEquals(endEvent1, patternMap1.get("end").get(0));
+
harness.close();
}
 
@Test
-   @Ignore
public void testNonKeyedCEPFunctionMigration() throws Exception {
 
final Event startEvent = new Event(42, "start", 1.0);
@@ -191,7 +237,7 @@ public class CEPMigration11to13Test {
harness.processElement(new StreamRecord<>(new Event(42, 
"start", 1.0), 4));
harness.processElement(new StreamRecord<>(endEvent, 5));
 
-   harness.processWatermark(new Watermark(Long.MAX_VALUE));
+   harness.processWatermark(new Watermark(20));
 
ConcurrentLinkedQueue result = harness.getOutput();
 
@@ -210,6 +256,54 @@ public class CEPMigration11to13Test {
assertEquals(middleEvent, patternMap.get("middle").get(0));
assertEquals(endEvent, patternMap.get("end").get(0));
 
+   

[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in operator.

2017-05-17 Thread kkloudas
[hotfix] [cep] Remove unused keySelector in operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ebcb07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ebcb07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ebcb07

Branch: refs/heads/master
Commit: f7ebcb07edecc06f523e4f46dfaa3ec10d1e90c8
Parents: 8e4db42
Author: kkloudas 
Authored: Mon May 15 14:49:00 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:33 2017 +0200

--
 .../cep/operator/AbstractKeyedCEPPatternOperator.java   |  6 --
 .../org/apache/flink/cep/operator/CEPOperatorUtils.java |  6 --
 .../flink/cep/operator/KeyedCEPPatternOperator.java |  4 +---
 .../cep/operator/TimeoutKeyedCEPPatternOperator.java|  4 +---
 .../flink/cep/operator/CEPFrom12MigrationTest.java  |  6 --
 .../flink/cep/operator/CEPMigration11to13Test.java  |  2 --
 .../org/apache/flink/cep/operator/CEPOperatorTest.java  | 12 
 .../org/apache/flink/cep/operator/CEPRescalingTest.java |  1 -
 8 files changed, 6 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7068bc4..bac21b3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator
 
private final TypeSerializer inputSerializer;
 
-   // necessary to extract the key from the input elements
-   private final KeySelector keySelector;
-
// necessary to serialize the set of seen keys
private final TypeSerializer keySerializer;
 
@@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator
public AbstractKeyedCEPPatternOperator(
final TypeSerializer inputSerializer,
final boolean isProcessingTime,
-   final KeySelector keySelector,
final TypeSerializer keySerializer,
final NFACompiler.NFAFactory nfaFactory,
final boolean migratingFromOldKeyedOperator) {
 
this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
this.isProcessingTime = 
Preconditions.checkNotNull(isProcessingTime);
-   this.keySelector = Preconditions.checkNotNull(keySelector);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 08424a4..e7b7e65 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -63,7 +63,6 @@ public class CEPOperatorUtils {
// We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
KeyedStream keyedStream= (KeyedStream) 
inputStream;
 
-   KeySelector keySelector = 
keyedStream.getKeySelector();
TypeSerializer keySerializer = 
keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
   

[1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.

2017-05-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master 9244106b3 -> 00ce3f1b1


http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2cc67e5..46e2fd4 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
@@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   List> resultingPatterns = new ArrayList<>();
-
-   for (StreamRecord inputEvent: inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   resultingPatterns.addAll(patterns);
-   }
-
-   assertEquals(1, resultingPatterns.size());
-   Map patternMap = resultingPatterns.get(0);
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(startEvent, patternMap.get("start"));
-   assertEquals(middleEvent, patternMap.get("middle"));
-   assertEquals(endEvent, patternMap.get("end"));
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(startEvent, middleEvent, 
endEvent)
+   ));
}
 
@Test
@@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   Set resultingPatterns = new HashSet<>();
-   List allPatterns = new ArrayList<>();
-
-   for (StreamRecord inputEvent : inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   for (Map foundPattern : patterns) {
-   resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-   allPatterns.add(foundPattern.values());
-   }
-   }
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(1, allPatterns.size());
-   assertEquals(Sets.newHashSet(
-   Sets.newHashSet(middleEvent1, end)
-   ), resultingPatterns);
+   compareMaps(resultingPatterns, Lists.newArrayList(
+   Lists.newArrayList(middleEvent1, end)
+   ));
}
 
@Test
@@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   Set resultingPatterns = new HashSet<>();
-
-   for (StreamRecord inputEvent : inputEvents) {
-   Collection> patterns = nfa.process(
-   inputEvent.getValue(),
-   inputEvent.getTimestamp()).f0;
-
-   for (Map foundPattern : patterns) {
-   resultingPatterns.add(new 
HashSet<>(foundPattern.values()));
-   }
-   }
+   List resultingPatterns = feedNFA(inputEvents, nfa);
 
-   assertEquals(Sets.newHashSet(), resultingPatterns);
+   compareMaps(resultingPatterns, 
Lists.newArrayList());
}
 
/**
@@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger {
@Test
public void testSimplePatternWithTimeWindowNFA() {
List events = new ArrayList<>();
-   List> resultingPatterns = new ArrayList<>();
 
final Event startEvent;
final Event middleEvent;
@@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger {
 
NFA nfa = NFACompiler.compile(pattern, 
Event.createTypeSerializer(), false);
 
-   for (StreamRecord event: events) {
-   

[6/9] flink git commit: [FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.

2017-05-17 Thread kkloudas
[FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4db423
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4db423
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4db423

Branch: refs/heads/master
Commit: 8e4db423b79580de0cf66e905f8a66c12ea3748a
Parents: 05ad87f
Author: kkloudas 
Authored: Mon May 15 14:33:09 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:33 2017 +0200

--
 .../apache/flink/cep/nfa/ComputationState.java  |  15 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  34 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 117 +++---
 .../java/org/apache/flink/cep/nfa/State.java|   6 +-
 .../apache/flink/cep/nfa/StateTransition.java   |  21 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java | 364 +++
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  78 ++--
 .../cep/operator/CEPFrom12MigrationTest.java|   3 +
 .../cep/operator/CEPMigration11to13Test.java|   3 +
 .../flink/cep/operator/CEPOperatorTest.java | 201 ++
 10 files changed, 728 insertions(+), 114 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8e4db423/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 08b9b78..44f8f39 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -40,6 +40,8 @@ public class ComputationState {
// the last taken event
private final T event;
 
+   private final int counter;
+
// timestamp of the last taken event
private final long timestamp;
 
@@ -58,11 +60,13 @@ public class ComputationState {
final State currentState,
final State previousState,
final T event,
+   final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
this.state = currentState;
this.event = event;
+   this.counter = counter;
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
@@ -70,6 +74,10 @@ public class ComputationState {
this.conditionContext = new ConditionContext(nfa, this);
}
 
+   public int getCounter() {
+   return counter;
+   }
+
public ConditionContext getConditionContext() {
return conditionContext;
}
@@ -108,12 +116,12 @@ public class ComputationState {
 
public static  ComputationState createStartState(final NFA 
nfa, final State state) {
Preconditions.checkArgument(state.isStart());
-   return new ComputationState<>(nfa, state, null, null, -1L, new 
DeweyNumber(1), -1L);
+   return new ComputationState<>(nfa, state, null, null, 0, -1L, 
new DeweyNumber(1), -1L);
}
 
public static  ComputationState createStartState(final NFA 
nfa, final State state, final DeweyNumber version) {
Preconditions.checkArgument(state.isStart());
-   return new ComputationState<>(nfa, state, null, null, -1L, 
version, -1L);
+   return new ComputationState<>(nfa, state, null, null, 0, -1L, 
version, -1L);
}
 
public static  ComputationState createState(
@@ -121,10 +129,11 @@ public class ComputationState {
final State currentState,
final State previousState,
final T event,
+   final int counter,
final long timestamp,
final DeweyNumber version,
final long startTimestamp) {
-   return new ComputationState<>(nfa, currentState, previousState, 
event, timestamp, version, startTimestamp);
+   return new ComputationState<>(nfa, currentState, previousState, 
event, counter, timestamp, version, startTimestamp);
}
 
public boolean isStopState() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e4db423/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 

[3/9] flink git commit: [FLINK-6255] [cep] Remove PatternStream.getSideOutput().

2017-05-17 Thread kkloudas
[FLINK-6255] [cep] Remove PatternStream.getSideOutput().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ad87f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ad87f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ad87f4

Branch: refs/heads/master
Commit: 05ad87f4ce8c0aea6944feb14bf19795c1fc56c9
Parents: 02ea418
Author: kl0u 
Authored: Fri May 12 16:01:38 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:32 2017 +0200

--
 docs/dev/libs/cep.md| 41 +---
 .../apache/flink/cep/scala/PatternStream.scala  | 35 +--
 .../org/apache/flink/cep/PatternStream.java | 55 +--
 .../AbstractKeyedCEPPatternOperator.java| 24 -
 .../flink/cep/operator/CEPOperatorUtils.java|  9 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  4 +-
 .../TimeoutKeyedCEPPatternOperator.java |  4 +-
 .../java/org/apache/flink/cep/CEPITCase.java| 98 +---
 .../cep/operator/CEPFrom12MigrationTest.java|  6 --
 .../cep/operator/CEPMigration11to13Test.java|  2 -
 .../flink/cep/operator/CEPOperatorTest.java |  2 -
 .../flink/cep/operator/CEPRescalingTest.java|  1 -
 12 files changed, 13 insertions(+), 268 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/docs/dev/libs/cep.md
--
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index b379615..58e1a0a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -806,46 +806,7 @@ in event time.
 
 To also guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes 
 *correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last 
-seen watermark. Late elements are not further processed but they can be 
redirected to a [side output]
-({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.
-
-To access the stream of late elements, you first need to specify that you want 
to get the late data using 
-`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the 
`CEP.pattern(...)` call. If you do not do
-so, the late elements will be silently dropped. Then, you can get the 
side-output stream using the 
-`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and 
providing as argument the output tag used in 
-the `.sideOutputLateData(OutputTag)`:
-
-
-
-{% highlight java %}
-final OutputTag lateOutputTag = new OutputTag("late-data"){};
-
-PatternStream patternStream = CEP.pattern(...)
-.sideOutputLateData(lateOutputTag);
-
-// main output with matches
-DataStream result = patternStream.select(...)
-
-// side output containing the late events
-DataStream lateStream = patternStream.getSideOutput(lateOutputTag);
-{% endhighlight %}
-
-
-
-{% highlight scala %}
-val lateOutputTag = OutputTag[T]("late-data")
-
-val patternStream: PatternStream[T] = CEP.pattern(...)
-.sideOutputLateData(lateOutputTag)
-
-// main output with matches
-val result = patternStream.select(...)
-
-// side output containing the late events
-val lateStream = patternStream.getSideOutput(lateOutputTag)
-{% endhighlight %}
-
-
+seen watermark. Late elements are not further processed.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
--
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d4bc28c..e71439c 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
-import org.apache.flink.util.{Collector, OutputTag}
+import org.apache.flink.util.Collector
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
-import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 
@@ 

[2/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.

2017-05-17 Thread kkloudas
[FLINK-6371] [cep] NFA return matched patterns as Map.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae9c9d06
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae9c9d06
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae9c9d06

Branch: refs/heads/master
Commit: ae9c9d061a8b49931c88908b8713cb2efe5f9202
Parents: 9244106
Author: kl0u 
Authored: Fri May 5 13:55:07 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:31 2017 +0200

--
 .../org/apache/flink/cep/CEPLambdaTest.java |  11 +-
 .../apache/flink/cep/scala/PatternStream.scala  |  31 +-
 ...StreamScalaJavaAPIInteroperabilityTest.scala |  33 +-
 .../flink/cep/PatternFlatSelectFunction.java|   3 +-
 .../flink/cep/PatternFlatTimeoutFunction.java   |   3 +-
 .../apache/flink/cep/PatternSelectFunction.java |   3 +-
 .../org/apache/flink/cep/PatternStream.java |  29 +-
 .../flink/cep/PatternTimeoutFunction.java   |   3 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 109 +---
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  10 +-
 .../flink/cep/operator/CEPOperatorUtils.java|  19 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  17 +-
 .../TimeoutKeyedCEPPatternOperator.java |  23 +-
 .../java/org/apache/flink/cep/CEPITCase.java|  69 +--
 .../org/apache/flink/cep/nfa/NFAITCase.java | 608 +++
 .../java/org/apache/flink/cep/nfa/NFATest.java  |  62 +-
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  17 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   1 -
 .../cep/operator/CEPFrom12MigrationTest.java|  57 +-
 .../cep/operator/CEPMigration11to13Test.java|  21 +-
 .../flink/cep/operator/CEPOperatorTest.java |  41 +-
 .../flink/cep/operator/CEPRescalingTest.java|  31 +-
 22 files changed, 474 insertions(+), 727 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
--
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java 
b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 5957158..03fb3c6 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -24,10 +24,13 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.*;
@@ -41,6 +44,7 @@ public class CEPLambdaTest extends TestLogger {
 * Tests that a Java8 lambda can be passed as a CEP select function
 */
@Test
+   @Ignore
public void testLambdaSelectFunction() {
TypeInformation eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
TypeInformation outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
@@ -59,16 +63,17 @@ public class CEPLambdaTest extends TestLogger {
PatternStream patternStream = new 
PatternStream<>(inputStream, dummyPattern);
 
DataStream result = patternStream.select(
-   map -> new EventB()
+   (Map map) -> new EventB()
);
 
assertEquals(outputTypeInformation, result.getType());
}
 
/**
-* Tests that a Java8 labmda can be passed as a CEP flat select function
+* Tests that a Java8 lambda can be passed as a CEP flat select function
 */
@Test
+   @Ignore
public void testLambdaFlatSelectFunction() {
TypeInformation eventTypeInformation = 
TypeExtractor.getForClass(EventA.class);
TypeInformation outputTypeInformation = 
TypeExtractor.getForClass(EventB.class);
@@ -86,7 +91,7 @@ public class CEPLambdaTest extends TestLogger {
PatternStream patternStream = new 
PatternStream<>(inputStream, dummyPattern);
 
DataStream result = patternStream.flatSelect(
-   (map, collector) -> collector.collect(new EventB())
+   (Map map, Collector 
collector) -> collector.collect(new EventB())
);
 
assertEquals(outputTypeInformation, result.getType());


flink git commit: [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.3 954a100dd -> fe1316b33


[FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser

This closes #3923.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe1316b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe1316b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe1316b3

Branch: refs/heads/release-1.3
Commit: fe1316b335cb970b7b721359d8a69e12fef700ba
Parents: 954a100
Author: twalthr 
Authored: Mon May 15 15:27:10 2017 +0200
Committer: twalthr 
Committed: Wed May 17 14:21:12 2017 +0200

--
 docs/dev/table_api.md   |  24 +-
 .../org/apache/flink/table/api/table.scala  |  16 +-
 .../apache/flink/table/codegen/Compiler.scala   |   4 +-
 .../table/expressions/ExpressionParser.scala| 397 ---
 .../flink/table/plan/ProjectionTranslator.scala |   2 +-
 .../flink/table/validate/FunctionCatalog.scala  |  26 +-
 .../CastingStringExpressionTest.scala   |   6 +-
 .../table/expressions/DecimalTypeTest.scala |   4 +-
 .../table/expressions/ScalarFunctionsTest.scala |   6 +-
 .../table/expressions/TemporalTypesTest.scala   |  32 +-
 .../UserDefinedScalarFunctionTest.scala |  31 +-
 .../plan/util/RexProgramExtractorTest.scala |   6 +-
 12 files changed, 251 insertions(+), 303 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fe1316b3/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index d105188..6a5ceee 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1038,9 +1038,9 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = alias ;
+expression = timeIndicator | overConstant | alias ;
 
-alias = logic | ( logic , "AS" , fieldReference ) ;
+alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , 
fieldReference , { "," , fieldReference } , ")" ) ;
 
 logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
 
@@ -1052,9 +1052,11 @@ product = unary , [ ( "*" | "/" | "%") , unary ] ;
 
 unary = [ "!" | "-" ] , composite ;
 
-composite = suffixed | atom ;
+composite = over | nullLiteral | suffixed | atom ;
 
-suffixed = interval | cast | as | aggregation | if | functionCall ;
+suffixed = interval | cast | as | if | functionCall ;
+
+interval = timeInterval | rowInterval ;
 
 timeInterval = composite , "." , ("year" | "years" | "month" | "months" | 
"day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" 
| "milli" | "millis") ;
 
@@ -1062,17 +1064,15 @@ rowInterval = composite , "." , "rows" ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | 
"INTERVAL_MILLIS" ;
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | 
"INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , 
")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
 
 as = composite , ".as(" , fieldReference , ")" ;
 
-aggregation = composite , ( ".sum" | ".sum0" | ".min" | ".max" | ".count" | 
".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | 
".var_samp" ) , [ "()" ] ;
-
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
 functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { 
"," , expression } ] , ")" ] ;
 
-atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
+atom = ( "(" , expression , ")" ) | literal | fieldReference ;
 
 fieldReference = "*" | identifier ;
 
@@ -1082,10 +1082,16 @@ timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | 
"DAY" | "DAY_TO_HOUR" |
 
 timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | 
"QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
 
+over = composite , "over" , fieldReference ;
+
+overConstant = "current_row" | "current_range" | "unbounded_row" | 
"unbounded_row" ;
+
+timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
+
 {% endhighlight %}
 
 Here, `literal` is a valid Java literal, `fieldReference` specifies a column 
in the data (or all columns if `*` is used), and `functionIdentifier` specifies 
a supported scalar function. The
-column names and function names follow Java identifier syntax. The column name 
`rowtime` is a reserved logical attribute in streaming environments. 
Expressions specified as Strings can also use prefix notation instead of suffix 
notation to call 

flink git commit: [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master a2580171d -> 9244106b3


[FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser

This closes #3923.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9244106b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9244106b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9244106b

Branch: refs/heads/master
Commit: 9244106b334ef54ba3e39a3f2c0c76f46ae4ecd3
Parents: a258017
Author: twalthr 
Authored: Mon May 15 15:27:10 2017 +0200
Committer: twalthr 
Committed: Wed May 17 14:15:07 2017 +0200

--
 docs/dev/table_api.md   |  24 +-
 .../org/apache/flink/table/api/table.scala  |  16 +-
 .../apache/flink/table/codegen/Compiler.scala   |   4 +-
 .../table/expressions/ExpressionParser.scala| 397 ---
 .../flink/table/plan/ProjectionTranslator.scala |   2 +-
 .../flink/table/validate/FunctionCatalog.scala  |  26 +-
 .../CastingStringExpressionTest.scala   |   6 +-
 .../table/expressions/DecimalTypeTest.scala |   4 +-
 .../table/expressions/ScalarFunctionsTest.scala |   6 +-
 .../table/expressions/TemporalTypesTest.scala   |  32 +-
 .../UserDefinedScalarFunctionTest.scala |  31 +-
 .../plan/util/RexProgramExtractorTest.scala |   6 +-
 12 files changed, 251 insertions(+), 303 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index d105188..6a5ceee 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1038,9 +1038,9 @@ This is the EBNF grammar for expressions:
 
 expressionList = expression , { "," , expression } ;
 
-expression = alias ;
+expression = timeIndicator | overConstant | alias ;
 
-alias = logic | ( logic , "AS" , fieldReference ) ;
+alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , 
fieldReference , { "," , fieldReference } , ")" ) ;
 
 logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
 
@@ -1052,9 +1052,11 @@ product = unary , [ ( "*" | "/" | "%") , unary ] ;
 
 unary = [ "!" | "-" ] , composite ;
 
-composite = suffixed | atom ;
+composite = over | nullLiteral | suffixed | atom ;
 
-suffixed = interval | cast | as | aggregation | if | functionCall ;
+suffixed = interval | cast | as | if | functionCall ;
+
+interval = timeInterval | rowInterval ;
 
 timeInterval = composite , "." , ("year" | "years" | "month" | "months" | 
"day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" 
| "milli" | "millis") ;
 
@@ -1062,17 +1064,15 @@ rowInterval = composite , "." , "rows" ;
 
 cast = composite , ".cast(" , dataType , ")" ;
 
-dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | 
"INTERVAL_MILLIS" ;
+dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" 
| "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | 
"INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , 
")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
 
 as = composite , ".as(" , fieldReference , ")" ;
 
-aggregation = composite , ( ".sum" | ".sum0" | ".min" | ".max" | ".count" | 
".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | 
".var_samp" ) , [ "()" ] ;
-
 if = composite , ".?(" , expression , "," , expression , ")" ;
 
 functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { 
"," , expression } ] , ")" ] ;
 
-atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ;
+atom = ( "(" , expression , ")" ) | literal | fieldReference ;
 
 fieldReference = "*" | identifier ;
 
@@ -1082,10 +1082,16 @@ timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | 
"DAY" | "DAY_TO_HOUR" |
 
 timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | 
"QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
 
+over = composite , "over" , fieldReference ;
+
+overConstant = "current_row" | "current_range" | "unbounded_row" | 
"unbounded_row" ;
+
+timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
+
 {% endhighlight %}
 
 Here, `literal` is a valid Java literal, `fieldReference` specifies a column 
in the data (or all columns if `*` is used), and `functionIdentifier` specifies 
a supported scalar function. The
-column names and function names follow Java identifier syntax. The column name 
`rowtime` is a reserved logical attribute in streaming environments. 
Expressions specified as Strings can also use prefix notation instead of suffix 
notation to call operators and 

flink-web git commit: Remove term 'official' from docker blog post and add disclaimer

2017-05-17 Thread rmetzger
Repository: flink-web
Updated Branches:
  refs/heads/asf-site f467722b5 -> edbd65a2e


Remove term 'official' from docker blog post and add disclaimer


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/edbd65a2
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/edbd65a2
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/edbd65a2

Branch: refs/heads/asf-site
Commit: edbd65a2e07cca32ad905c0cab5ce498ab0e79e9
Parents: f467722
Author: Robert Metzger 
Authored: Wed May 17 12:18:41 2017 +0200
Committer: Robert Metzger 
Committed: Wed May 17 12:28:04 2017 +0200

--
 _config.yml|  1 -
 _posts/2017-05-16-official-docker-image.md |  8 +---
 content/blog/feed.xml  |  8 +---
 content/blog/index.html|  6 +++---
 content/blog/page2/index.html  |  2 +-
 content/blog/page3/index.html  |  2 +-
 content/blog/page4/index.html  |  2 +-
 content/blog/page5/index.html  |  2 +-
 content/index.html |  4 ++--
 content/news/2017/05/16/official-docker-image.html | 10 ++
 10 files changed, 25 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink-web/blob/edbd65a2/_config.yml
--
diff --git a/_config.yml b/_config.yml
index 30db1b5..b94c71c 100644
--- a/_config.yml
+++ b/_config.yml
@@ -97,4 +97,3 @@ host: 0.0.0.0
 # News Posts
 paginate: 10
 paginate_path: "blog/page:num"
-

http://git-wip-us.apache.org/repos/asf/flink-web/blob/edbd65a2/_posts/2017-05-16-official-docker-image.md
--
diff --git a/_posts/2017-05-16-official-docker-image.md 
b/_posts/2017-05-16-official-docker-image.md
index d452abd..e2c76be 100644
--- a/_posts/2017-05-16-official-docker-image.md
+++ b/_posts/2017-05-16-official-docker-image.md
@@ -1,17 +1,17 @@
 ---
 layout: post
-title:  "Introducing Official Docker Images for Apache Flink"
+title:  "Introducing Docker Images for Apache Flink"
 date:   2017-05-16 09:00:00
 author: "Patrick Lucas (Data Artisans) and Ismaël Mejía (Talend)"
 author-twitter: "iemejia"
 categories: news
 ---
 
-For some time, the Apache Flink community has provided scripts to build a 
Docker image to run Flink. Now, starting with version 1.2.1, Flink will have an 
[official Docker image](https://hub.docker.com/r/_/flink/). This image is 
maintained by the Flink community and curated by the 
[Docker](https://github.com/docker-library/official-images) team to ensure it 
meets the quality standards for container images of the Docker community.
+For some time, the Apache Flink community has provided scripts to build a 
Docker image to run Flink. Now, starting with version 1.2.1, Flink will have a 
[Docker image](https://hub.docker.com/r/_/flink/) on the Docker Hub. This image 
is maintained by the Flink community and curated by the 
[Docker](https://github.com/docker-library/official-images) team to ensure it 
meets the quality standards for container images of the Docker community.
 
 A community-maintained way to run Apache Flink on Docker and other container 
runtimes and orchestrators is part of the ongoing effort by the Flink community 
to make Flink a first-class citizen of the container world.
 
-If you want to use the official Docker image today you can get the latest 
version by running:
+If you want to use the Docker image today you can get the latest version by 
running:
 
docker pull flink
 
@@ -24,3 +24,5 @@ With this image there are various ways to start a Flink 
cluster, both locally an
 While this announcement is an important milestone, it’s just the first step 
to help users run containerized Flink in production. There are 
[improvements](https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20Docker%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC)
 to be made in Flink itself and we will continue to improve these Docker images 
and for the documentation and examples surrounding them.
 
 This is of course a team effort, so any contribution is welcome. The 
[docker-flink](https://github.com/docker-flink) GitHub organization hosts the 
source files to [generate the 
images](https://github.com/docker-flink/docker-flink) and the 
[documentation](https://github.com/docker-flink/docs/tree/master/flink) that is 
presented alongside the images on Docker Hub.
+
+*Disclaimer: The docker images are provided as a community project by 
individuals on a best-effort basis. They are not 

flink git commit: [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.3 60873b0c5 -> 954a100dd


[FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule

This closes #3924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954a100d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954a100d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954a100d

Branch: refs/heads/release-1.3
Commit: 954a100dd5fe6ecbb2aac40f33e472d85a5822df
Parents: 60873b0
Author: twalthr 
Authored: Tue May 16 16:34:54 2017 +0200
Committer: twalthr 
Committed: Wed May 17 11:54:46 2017 +0200

--
 .../DataStreamLogicalWindowAggregateRule.scala  | 34 +++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  6 ++--
 .../calcite/RelTimeIndicatorConverterTest.scala |  3 +-
 3 files changed, 23 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/954a100d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 38de539..050e2cd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, Window}
@@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 class DataStreamLogicalWindowAggregateRule
   extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
 
-  /** Returns a zero literal of the correct time type */
+  /** Returns a reference to the time attribute with a time indicator type */
   override private[table] def getInAggregateGroupExpression(
   rexBuilder: RexBuilder,
-  windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  /** Returns a zero literal of the correct time type */
-  override private[table] def getOutAggregateGroupExpression(
-  rexBuilder: RexBuilder,
-  windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  private def createZeroLiteral(
-  rexBuilder: RexBuilder,
   windowExpression: RexCall): RexNode = {
 
-val timeType = windowExpression.operands.get(0).getType
-timeType match {
+val timeAttribute = windowExpression.operands.get(0)
+timeAttribute match {
 
-  case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) =>
-rexBuilder.makeLiteral(0L, timeType, true)
+  case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) =>
+timeAttribute
 
   case _ =>
-throw TableException(s"""Time attribute expected but $timeType 
encountered.""")
+throw TableException(
+  s"""Time attribute expected but ${timeAttribute.getType} 
encountered.""")
 }
   }
 
+  /** Returns a zero literal of a timestamp type */
+  override private[table] def getOutAggregateGroupExpression(
+  rexBuilder: RexBuilder,
+  windowExpression: RexCall): RexNode = {
+
+rexBuilder.makeLiteral(
+  0L,
+  rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP),
+  true)
+  }
+
   override private[table] def translateWindowExpression(
   windowExpr: RexCall,
   rowType: RelDataType): Window = {

http://git-wip-us.apache.org/repos/asf/flink/blob/954a100d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 3729ef0..2022db8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 

flink git commit: [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule

2017-05-17 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 7ad489d87 -> a2580171d


[FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule

This closes #3924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2580171
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2580171
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2580171

Branch: refs/heads/master
Commit: a2580171dd6e9044c0694deea83a2a2f1f9eb1ee
Parents: 7ad489d
Author: twalthr 
Authored: Tue May 16 16:34:54 2017 +0200
Committer: twalthr 
Committed: Wed May 17 11:53:31 2017 +0200

--
 .../DataStreamLogicalWindowAggregateRule.scala  | 34 +++-
 .../scala/stream/sql/WindowAggregateTest.scala  |  6 ++--
 .../calcite/RelTimeIndicatorConverterTest.scala |  3 +-
 3 files changed, 23 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 38de539..050e2cd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, Window}
@@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 class DataStreamLogicalWindowAggregateRule
   extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
 
-  /** Returns a zero literal of the correct time type */
+  /** Returns a reference to the time attribute with a time indicator type */
   override private[table] def getInAggregateGroupExpression(
   rexBuilder: RexBuilder,
-  windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  /** Returns a zero literal of the correct time type */
-  override private[table] def getOutAggregateGroupExpression(
-  rexBuilder: RexBuilder,
-  windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  private def createZeroLiteral(
-  rexBuilder: RexBuilder,
   windowExpression: RexCall): RexNode = {
 
-val timeType = windowExpression.operands.get(0).getType
-timeType match {
+val timeAttribute = windowExpression.operands.get(0)
+timeAttribute match {
 
-  case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) =>
-rexBuilder.makeLiteral(0L, timeType, true)
+  case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) =>
+timeAttribute
 
   case _ =>
-throw TableException(s"""Time attribute expected but $timeType 
encountered.""")
+throw TableException(
+  s"""Time attribute expected but ${timeAttribute.getType} 
encountered.""")
 }
   }
 
+  /** Returns a zero literal of a timestamp type */
+  override private[table] def getOutAggregateGroupExpression(
+  rexBuilder: RexBuilder,
+  windowExpression: RexCall): RexNode = {
+
+rexBuilder.makeLiteral(
+  0L,
+  rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP),
+  true)
+  }
+
   override private[table] def translateWindowExpression(
   windowExpr: RexCall,
   rowType: RelDataType): Window = {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 3729ef0..2022db8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 

[1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results

2017-05-17 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master dceb5cc17 -> 7ad489d87


[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with 
ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future 
values
are discarded making it more efficient than the ResultConjunctFuture which 
returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection).

This closes #3873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c081201f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c081201f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c081201f

Branch: refs/heads/master
Commit: c081201fd6c3c97a932c09d971f24bf42102650f
Parents: dceb5cc
Author: Till Rohrmann 
Authored: Thu May 11 17:36:17 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:18:23 2017 +0200

--
 .../flink/runtime/concurrent/FutureUtils.java   | 131 +++
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java  |   4 +-
 .../executiongraph/failover/FailoverRegion.java |   2 +-
 .../runtime/concurrent/FutureUtilsTest.java |  83 ++--
 5 files changed, 184 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4948147..a27af56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,8 +109,9 @@ public class FutureUtils {
 
/**
 * Creates a future that is complete once multiple other futures 
completed. 
-* The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
-* conjunction fails.
+* The future fails (completes exceptionally) once one of the futures 
in the
+* conjunction fails. Upon successful completion, the future returns the
+* collection of the futures' results.
 *
 * The ConjunctFuture gives access to how many Futures in the 
conjunction have already
 * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}. 
@@ -115,16 +119,16 @@ public class FutureUtils {
 * @param futures The futures that make up the conjunction. No null 
entries are allowed.
 * @return The ConjunctFuture that completes once all given futures are 
complete (or one fails).
 */
-   public static ConjunctFuture combineAll(Collection> 
futures) {
+   public static  ConjunctFuture combineAll(Collection> futures) {
checkNotNull(futures, "futures");
 
-   final ConjunctFutureImpl conjunct = new 
ConjunctFutureImpl(futures.size());
+   final ResultConjunctFuture conjunct = new 
ResultConjunctFuture<>(futures.size());
 
if (futures.isEmpty()) {
-   conjunct.complete(null);
+   conjunct.complete(Collections.emptyList());
}
else {
-   for (Future future : futures) {
+   for (Future future : futures) {
future.handle(conjunct.completionHandler);
}
}
@@ -133,16 +137,32 @@ public class FutureUtils {
}
 
/**
+* Creates a future that is complete once all of the given futures have 
completed.
+* The future fails (completes exceptionally) once one of the given 
futures
+* fails.
+*
+* The ConjunctFuture gives access to how many Futures have already
+* completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}.
+*
+* @param futures The futures to wait on. No null entries are allowed.
+* @return The 

[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

2017-05-17 Thread trohrmann
http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 27603d0..f9052e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import org.junit.Test;
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
 
/** The buffer size used during the tests in bytes. */
private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -64,19 +65,14 @@ public class BlobClientSslTest {
 * Starts the SSL enabled BLOB server.
 */
@BeforeClass
-   public static void startSSLServer() {
-   try {
-   Configuration config = new Configuration();
-   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-   BLOB_SSL_SERVER = new BlobServer(config);
-   }
-   catch (IOException e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
+   public static void startSSLServer() throws IOException {
+   Configuration config = new Configuration();
+   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+   config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+   BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
 
sslClientConfig = new Configuration();

sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -88,20 +84,14 @@ public class BlobClientSslTest {
 * Starts the SSL disabled BLOB server.
 */
@BeforeClass
-   public static void startNonSSLServer() {
-   try {
-   Configuration config = new Configuration();
-   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-   config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-   BLOB_SERVER = new BlobServer(config);
-   }
-   catch (IOException e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
+   public static void startNonSSLServer() throws IOException {
+   Configuration config = new Configuration();
+   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+   config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
+   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+   config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+   BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
clientConfig = new Configuration();
clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
@@ -114,13 +104,13 @@ public class BlobClientSslTest {
 * Shuts the BLOB server down.
 */
@AfterClass
-   public static void stopServers() {
+   public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
-  

[4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

2017-05-17 Thread trohrmann
[FLINK-6519] Integrate BlobStore in lifecycle management of 
HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's 
lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
is cleaned up. Having this single point of control, makes it easier to decide 
when
to discard HA data (e.g. in case of a successful job execution) and when to 
retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and closeAndCleanupAllData 
methods

This closes #3864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88b0f2ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88b0f2ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88b0f2ac

Branch: refs/heads/master
Commit: 88b0f2ac3fd788932d7f434ca57ba3718c3fa621
Parents: ebc1368
Author: Till Rohrmann 
Authored: Tue May 9 10:26:37 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:18:49 2017 +0200

--
 .../org/apache/flink/hdfstests/HDFSTest.java| 14 ++-
 .../clusterframework/MesosTaskManager.scala |  6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +-
 .../handlers/TaskManagerLogHandler.java | 11 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java | 14 ++-
 .../handlers/TaskManagerLogHandlerTest.java | 11 ++-
 .../apache/flink/runtime/blob/BlobCache.java| 80 -
 .../apache/flink/runtime/blob/BlobServer.java   | 38 
 .../apache/flink/runtime/blob/BlobService.java  |  8 +-
 .../apache/flink/runtime/blob/BlobStore.java| 26 +-
 .../flink/runtime/blob/BlobStoreService.java| 32 +++
 .../apache/flink/runtime/blob/BlobUtils.java| 44 +++--
 .../org/apache/flink/runtime/blob/BlobView.java | 49 +++
 .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++-
 .../flink/runtime/blob/VoidBlobStore.java   |  8 +-
 .../apache/flink/runtime/client/JobClient.java  | 13 ++-
 .../runtime/client/JobListeningContext.java |  6 +-
 .../clusterframework/BootstrapTools.java|  8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../HighAvailabilityServicesUtils.java  | 12 ++-
 .../nonha/AbstractNonHaServices.java|  5 +-
 .../zookeeper/ZooKeeperHaServices.java  | 93 ++--
 .../runtime/jobmaster/JobManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java  |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java | 21 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 21 ++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  8 +-
 .../minicluster/LocalFlinkMiniCluster.scala |  8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 36 +---
 .../runtime/blob/BlobCacheRetriesTest.java  | 79 -
 .../runtime/blob/BlobCacheSuccessTest.java  | 56 ++--
 .../flink/runtime/blob/BlobClientSslTest.java   | 52 +--
 .../flink/runtime/blob/BlobClientTest.java  | 16 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 21 +++--
 .../runtime/blob/BlobServerDeleteTest.java  | 21 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 41 +++--
 .../flink/runtime/blob/BlobServerPutTest.java   | 89 +--
 .../flink/runtime/blob/BlobServerRangeTest.java | 10 +--
 .../runtime/blob/TestingFailingBlobServer.java  |  4 +-
 .../BlobLibraryCacheManagerTest.java| 33 +++
 .../BlobLibraryCacheRecoveryITCase.java | 21 +++--
 .../zookeeper/ZooKeeperRegistryTest.java|  7 +-
 .../jobmanager/JobManagerHARecoveryTest.java|  9 +-
 .../JobManagerLeaderElectionTest.java   |  3 +-
 .../ZooKeeperLeaderRetrievalTest.java   | 13 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  5 +-
 .../TaskManagerRegistrationTest.java|  5 +-
 .../src/test/resources/log4j-test.properties|  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala |  3 +-
 .../testingUtils/TestingTaskManager.scala   | 40 -
 ...agerHAProcessFailureBatchRecoveryITCase.java |  1 +
 .../flink/yarn/TestingYarnTaskManager.scala | 25 +++---
 .../YarnHighAvailabilityServices.java   | 30 ++-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  6 +-
 55 files changed, 667 insertions(+), 545 deletions(-)
--



[5/5] flink git commit: [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion

2017-05-17 Thread trohrmann
[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation 
and deletion

This commit introduces a BlobServer#readWriteLock in order to synchronize file 
creation
and deletion operations in BlobServerConnection and BlobServer. This will 
prevent
that multiple put and get operations interfere with each other and with get 
operations.

The get operations are synchronized using the read lock in order to guarantee 
some kind of
parallelism.

Add Get and Delete operation tests

This closes #3888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad489d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad489d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad489d8

Branch: refs/heads/master
Commit: 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303
Parents: 88b0f2a
Author: Till Rohrmann 
Authored: Wed May 10 17:38:49 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:19:05 2017 +0200

--
 .../apache/flink/runtime/blob/BlobClient.java   |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  29 ++-
 .../runtime/blob/BlobServerConnection.java  | 240 +++
 .../runtime/blob/BlobServerDeleteTest.java  |  73 +-
 .../flink/runtime/blob/BlobServerGetTest.java   | 115 -
 .../flink/runtime/blob/BlobServerPutTest.java   | 109 -
 .../src/test/resources/log4j-test.properties|   2 +-
 7 files changed, 509 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 49e54a1..fab3c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -537,7 +537,7 @@ public final class BlobClient implements Closeable {
throw new IOException("Server side error: " + 
cause.getMessage(), cause);
}
else {
-   throw new IOException("Unrecognized response");
+   throw new IOException("Unrecognized response: " + 
response + '.');
}
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 937eab0..5ad4b6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService 
{
/** The maximum number of concurrent connections */
private final int maxConnections;
 
+   /** Lock guarding concurrent file accesses */
+   private final ReadWriteLock readWriteLock;
+
/**
 * Shutdown hook thread to ensure deletion of the storage directory (or 
null if
 * the configured high availability mode does not equal{@link 
HighAvailabilityMode#NONE})
@@ -104,6 +109,7 @@ public class BlobServer extends Thread implements 
BlobService {
public BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
this.blobServiceConfiguration = checkNotNull(config);
this.blobStore = checkNotNull(blobStore);
+   this.readWriteLock = new ReentrantReadWriteLock();
 
// configure and create the storage directory
String storageDirectory = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
@@ -235,6 +241,13 @@ public class BlobServer extends Thread implements 
BlobService {
return blobStore;
}
 
+   /**
+* Returns the lock used to guard file accesses
+*/
+   public ReadWriteLock getReadWriteLock() {
+   return readWriteLock;
+   }
+
@Override
public void 

[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

2017-05-17 Thread trohrmann
[FLINK-6284] Correct sorting of completed checkpoints in 
ZooKeeperStateHandleStore

In order to store completed checkpoints in an increasing order in ZooKeeper,
the paths for the completed checkpoint is no generated by
String.format("/%019d", checkpointId) instead of String.format("/%s", 
checkpointId).
This makes sure that the converted long will always have the same length with
leading 0s.

Fix failing ZooKeeperCompletedCheckpointStoreITCase

This closes #3884.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebc13688
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebc13688
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebc13688

Branch: refs/heads/master
Commit: ebc13688c9441adf61075707f40b361ee02597f3
Parents: c081201
Author: Till Rohrmann 
Authored: Fri May 12 14:23:37 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:18:27 2017 +0200

--
 .../ZooKeeperCompletedCheckpointStore.java  |  6 ++--
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +---
 2 files changed, 33 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ebc13688/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c8c68bc..95cfb0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 * @param checkpointId to convert to the path
 * @return Path created from the given checkpoint id
 */
-   protected static String checkpointIdToPath(long checkpointId) {
-   return String.format("/%s", checkpointId);
+   public static String checkpointIdToPath(long checkpointId) {
+   return String.format("/%019d", checkpointId);
}
 
/**
@@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 * @param path in ZooKeeper
 * @return Checkpoint id parsed from the path
 */
-   protected static long pathToCheckpointId(String path) {
+   public static long pathToCheckpointId(String path) {
try {
String numberString;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ebc13688/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73e0ed9..3fd7f1b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
-   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
+   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(;
 
store.shutdown(JobStatus.FINISHED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
-   assertNull(client.checkExists().forPath(CheckpointsPath + "/" + 
checkpoint.getCheckpointID()));
+   assertNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(;
 
store.recover();
 
@@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
-   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
+   

[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore

2017-05-17 Thread trohrmann
[FLINK-6284] Correct sorting of completed checkpoints in 
ZooKeeperStateHandleStore

In order to store completed checkpoints in an increasing order in ZooKeeper,
the paths for the completed checkpoint is no generated by
String.format("/%019d", checkpointId) instead of String.format("/%s", 
checkpointId).
This makes sure that the converted long will always have the same length with
leading 0s.

Fix failing ZooKeeperCompletedCheckpointStoreITCase

This closes #3884.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/827d74e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/827d74e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/827d74e6

Branch: refs/heads/release-1.3
Commit: 827d74e69386cff87576972c1b69a16b92b730ae
Parents: 9c6c965
Author: Till Rohrmann 
Authored: Fri May 12 14:23:37 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:16:54 2017 +0200

--
 .../ZooKeeperCompletedCheckpointStore.java  |  6 ++--
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +---
 2 files changed, 33 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c8c68bc..95cfb0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 * @param checkpointId to convert to the path
 * @return Path created from the given checkpoint id
 */
-   protected static String checkpointIdToPath(long checkpointId) {
-   return String.format("/%s", checkpointId);
+   public static String checkpointIdToPath(long checkpointId) {
+   return String.format("/%019d", checkpointId);
}
 
/**
@@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends 
AbstractCompletedCheckpoi
 * @param path in ZooKeeper
 * @return Checkpoint id parsed from the path
 */
-   protected static long pathToCheckpointId(String path) {
+   public static long pathToCheckpointId(String path) {
try {
String numberString;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73e0ed9..3fd7f1b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
-   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
+   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(;
 
store.shutdown(JobStatus.FINISHED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
-   assertNull(client.checkExists().forPath(CheckpointsPath + "/" + 
checkpoint.getCheckpointID()));
+   assertNull(client.checkExists().forPath(CheckpointsPath + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(;
 
store.recover();
 
@@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
-   assertNotNull(client.checkExists().forPath(CheckpointsPath + 
"/" + checkpoint.getCheckpointID()));
+   

[4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

2017-05-17 Thread trohrmann
[FLINK-6519] Integrate BlobStore in lifecycle management of 
HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's 
lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
is cleaned up. Having this single point of control, makes it easier to decide 
when
to discard HA data (e.g. in case of a successful job execution) and when to 
retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and closeAndCleanupAllData 
methods

This closes #3864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ea89a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ea89a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ea89a9

Branch: refs/heads/release-1.3
Commit: e3ea89a9fab39e7595c466bcd90c30c338c86f4e
Parents: 827d74e
Author: Till Rohrmann 
Authored: Tue May 9 10:26:37 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:16:59 2017 +0200

--
 .../org/apache/flink/hdfstests/HDFSTest.java| 14 ++-
 .../clusterframework/MesosTaskManager.scala |  6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +-
 .../handlers/TaskManagerLogHandler.java | 11 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java | 14 ++-
 .../handlers/TaskManagerLogHandlerTest.java | 11 ++-
 .../apache/flink/runtime/blob/BlobCache.java| 80 -
 .../apache/flink/runtime/blob/BlobServer.java   | 38 
 .../apache/flink/runtime/blob/BlobService.java  |  8 +-
 .../apache/flink/runtime/blob/BlobStore.java| 26 +-
 .../flink/runtime/blob/BlobStoreService.java| 32 +++
 .../apache/flink/runtime/blob/BlobUtils.java| 44 +++--
 .../org/apache/flink/runtime/blob/BlobView.java | 49 +++
 .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++-
 .../flink/runtime/blob/VoidBlobStore.java   |  8 +-
 .../apache/flink/runtime/client/JobClient.java  | 13 ++-
 .../runtime/client/JobListeningContext.java |  6 +-
 .../clusterframework/BootstrapTools.java|  8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../HighAvailabilityServicesUtils.java  | 12 ++-
 .../nonha/AbstractNonHaServices.java|  5 +-
 .../zookeeper/ZooKeeperHaServices.java  | 93 ++--
 .../runtime/jobmaster/JobManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java  |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java | 21 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 21 ++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  8 +-
 .../minicluster/LocalFlinkMiniCluster.scala |  8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 36 +---
 .../runtime/blob/BlobCacheRetriesTest.java  | 79 -
 .../runtime/blob/BlobCacheSuccessTest.java  | 56 ++--
 .../flink/runtime/blob/BlobClientSslTest.java   | 52 +--
 .../flink/runtime/blob/BlobClientTest.java  | 16 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 21 +++--
 .../runtime/blob/BlobServerDeleteTest.java  | 21 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 41 +++--
 .../flink/runtime/blob/BlobServerPutTest.java   | 89 +--
 .../flink/runtime/blob/BlobServerRangeTest.java | 10 +--
 .../runtime/blob/TestingFailingBlobServer.java  |  4 +-
 .../BlobLibraryCacheManagerTest.java| 33 +++
 .../BlobLibraryCacheRecoveryITCase.java | 21 +++--
 .../zookeeper/ZooKeeperRegistryTest.java|  7 +-
 .../jobmanager/JobManagerHARecoveryTest.java|  9 +-
 .../JobManagerLeaderElectionTest.java   |  3 +-
 .../ZooKeeperLeaderRetrievalTest.java   | 13 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  5 +-
 .../TaskManagerRegistrationTest.java|  5 +-
 .../src/test/resources/log4j-test.properties|  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala |  3 +-
 .../testingUtils/TestingTaskManager.scala   | 40 -
 ...agerHAProcessFailureBatchRecoveryITCase.java |  1 +
 .../flink/yarn/TestingYarnTaskManager.scala | 25 +++---
 .../YarnHighAvailabilityServices.java   | 30 ++-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  6 +-
 55 files changed, 667 insertions(+), 545 deletions(-)
--



[1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results

2017-05-17 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.3 4104409cc -> 60873b0c5


[FLINK-6555] [futures] Generalize ConjunctFuture to return results

The ConjunctFuture now returns the set of future values once it is completed.

Introduce WaitingConjunctFuture; Fix thread safety issue with 
ResultConjunctFuture

The WaitingConjunctFuture waits for the completion of its futures. The future 
values
are discarded making it more efficient than the ResultConjunctFuture which 
returns
the futures' values. The WaitingConjunctFuture is instantiated via
FutureUtils.waitForAll(Collection).

This closes #3873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6c9654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6c9654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6c9654

Branch: refs/heads/release-1.3
Commit: 9c6c9654e11dd31fa2323977fc02961811d6e518
Parents: 4104409
Author: Till Rohrmann 
Authored: Thu May 11 17:36:17 2017 +0200
Committer: Till Rohrmann 
Committed: Wed May 17 08:16:50 2017 +0200

--
 .../flink/runtime/concurrent/FutureUtils.java   | 131 +++
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +-
 .../executiongraph/ExecutionJobVertex.java  |   4 +-
 .../executiongraph/failover/FailoverRegion.java |   2 +-
 .../runtime/concurrent/FutureUtilsTest.java |  83 ++--
 5 files changed, 184 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 4948147..a27af56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -106,8 +109,9 @@ public class FutureUtils {
 
/**
 * Creates a future that is complete once multiple other futures 
completed. 
-* The ConjunctFuture fails (completes exceptionally) once one of the 
Futures in the
-* conjunction fails.
+* The future fails (completes exceptionally) once one of the futures 
in the
+* conjunction fails. Upon successful completion, the future returns the
+* collection of the futures' results.
 *
 * The ConjunctFuture gives access to how many Futures in the 
conjunction have already
 * completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}. 
@@ -115,16 +119,16 @@ public class FutureUtils {
 * @param futures The futures that make up the conjunction. No null 
entries are allowed.
 * @return The ConjunctFuture that completes once all given futures are 
complete (or one fails).
 */
-   public static ConjunctFuture combineAll(Collection> 
futures) {
+   public static  ConjunctFuture combineAll(Collection> futures) {
checkNotNull(futures, "futures");
 
-   final ConjunctFutureImpl conjunct = new 
ConjunctFutureImpl(futures.size());
+   final ResultConjunctFuture conjunct = new 
ResultConjunctFuture<>(futures.size());
 
if (futures.isEmpty()) {
-   conjunct.complete(null);
+   conjunct.complete(Collections.emptyList());
}
else {
-   for (Future future : futures) {
+   for (Future future : futures) {
future.handle(conjunct.completionHandler);
}
}
@@ -133,16 +137,32 @@ public class FutureUtils {
}
 
/**
+* Creates a future that is complete once all of the given futures have 
completed.
+* The future fails (completes exceptionally) once one of the given 
futures
+* fails.
+*
+* The ConjunctFuture gives access to how many Futures have already
+* completed successfully, via {@link 
ConjunctFuture#getNumFuturesCompleted()}.
+*
+* @param futures The futures to wait on. No null entries are allowed.
+* @return The 

[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices

2017-05-17 Thread trohrmann
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..c106b3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,7 +43,7 @@ import org.junit.Test;
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
 
/** The buffer size used during the tests in bytes. */
private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -63,19 +64,14 @@ public class BlobClientSslTest {
 * Starts the SSL enabled BLOB server.
 */
@BeforeClass
-   public static void startSSLServer() {
-   try {
-   Configuration config = new Configuration();
-   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-   BLOB_SSL_SERVER = new BlobServer(config);
-   }
-   catch (IOException e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
+   public static void startSSLServer() throws IOException {
+   Configuration config = new Configuration();
+   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+   config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+   BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
 
sslClientConfig = new Configuration();

sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -87,20 +83,14 @@ public class BlobClientSslTest {
 * Starts the SSL disabled BLOB server.
 */
@BeforeClass
-   public static void startNonSSLServer() {
-   try {
-   Configuration config = new Configuration();
-   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-   
config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
-   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-   
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-   BLOB_SERVER = new BlobServer(config);
-   }
-   catch (IOException e) {
-   e.printStackTrace();
-   fail(e.getMessage());
-   }
+   public static void startNonSSLServer() throws IOException {
+   Configuration config = new Configuration();
+   config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+   config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, 
false);
+   config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+   
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+   config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+   BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
clientConfig = new Configuration();
clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
@@ -113,13 +103,13 @@ public class BlobClientSslTest {
 * Shuts the BLOB server down.
 */
@AfterClass
-   public static void stopServers() {
+   public static void stopServers() throws IOException {
if