flink git commit: [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints
Repository: flink Updated Branches: refs/heads/release-1.3 98f4fad93 -> 8b5ba676f [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints This commit adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the oppurtunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers. This closes #3925. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b5ba676 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b5ba676 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b5ba676 Branch: refs/heads/release-1.3 Commit: 8b5ba676fb3b52f69b0a523d57264462ac73c23f Parents: 98f4fad Author: Tzu-Li (Gordon) TaiAuthored: Wed May 17 01:15:57 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Wed May 17 22:52:54 2017 +0800 -- .../state/RocksDBKeyedStateBackend.java | 30 ++ .../state/KeyedBackendSerializationProxy.java | 61 ++-- .../state/heap/HeapKeyedStateBackend.java | 15 + .../runtime/state/SerializationProxiesTest.java | 54 - .../runtime/state/StateBackendTestBase.java | 42 ++ 5 files changed, 197 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8b5ba676/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4bd94fd..ddc7e17 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * @throws ClassNotFoundException * @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = @@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); @@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { this.stateBackend = stateBackend; } + @SuppressWarnings("unchecked") private List readMetaData( StreamStateHandle metaStateHandle) throws Exception { @@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); + // check for key serializer compatibility;
flink git commit: [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints
Repository: flink Updated Branches: refs/heads/master 2bfead7d9 -> d8a467b01 [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints This commit adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the oppurtunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers. This closes #3925. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a467b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a467b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a467b0 Branch: refs/heads/master Commit: d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4 Parents: 2bfead7 Author: Tzu-Li (Gordon) TaiAuthored: Wed May 17 01:15:57 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Wed May 17 22:46:51 2017 +0800 -- .../state/RocksDBKeyedStateBackend.java | 30 ++ .../state/KeyedBackendSerializationProxy.java | 61 ++-- .../state/heap/HeapKeyedStateBackend.java | 15 + .../runtime/state/SerializationProxiesTest.java | 54 - .../runtime/state/StateBackendTestBase.java | 42 ++ 5 files changed, 197 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4bd94fd..ddc7e17 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * @throws ClassNotFoundException * @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = @@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); @@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { this.stateBackend = stateBackend; } + @SuppressWarnings("unchecked") private List readMetaData( StreamStateHandle metaStateHandle) throws Exception { @@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); + // check for key serializer compatibility; this
[1/3] flink git commit: [FLINK-6598] [table] Remove unused parameter from DataStreamGroupAggregate.
Repository: flink Updated Branches: refs/heads/release-1.3 7a045f204 -> 98f4fad93 [FLINK-6598] [table] Remove unused parameter from DataStreamGroupAggregate. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/056d9553 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/056d9553 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/056d9553 Branch: refs/heads/release-1.3 Commit: 056d9553d713834d18c1b3490a6e3f106129a9ef Parents: 7a045f2 Author: sunjincheng121Authored: Tue May 16 19:08:11 2017 +0800 Committer: Fabian Hueske Committed: Wed May 17 15:29:40 2017 +0200 -- .../table/plan/nodes/datastream/DataStreamGroupAggregate.scala| 3 --- .../plan/rules/datastream/DataStreamGroupAggregateRule.scala | 1 - 2 files changed, 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/056d9553/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index e5d8088..d54c04b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory * @param traitSetTrait set of the RelNode * @param inputNode The input RelNode of aggregation * @param namedAggregates List of calls to aggregate functions and their output field names - * @param rowRelDataType The type of the rows of the RelNode * @param inputSchema The type of the rows consumed by this RelNode * @param schema The type of the rows emitted by this RelNode * @param groupings The position (in the input Row) of the grouping keys @@ -53,7 +52,6 @@ class DataStreamGroupAggregate( traitSet: RelTraitSet, inputNode: RelNode, namedAggregates: Seq[CalcitePair[AggregateCall, String]], -rowRelDataType: RelDataType, schema: RowSchema, inputSchema: RowSchema, groupings: Array[Int]) @@ -79,7 +77,6 @@ class DataStreamGroupAggregate( traitSet, inputs.get(0), namedAggregates, - getRowType, schema, inputSchema, groupings) http://git-wip-us.apache.org/repos/asf/flink/blob/056d9553/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala index fd7619c..0b8e411 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala @@ -68,7 +68,6 @@ class DataStreamGroupAggregateRule traitSet, convInput, agg.getNamedAggCalls, - rel.getRowType, new RowSchema(rel.getRowType), new RowSchema(agg.getInput.getRowType), agg.getGroupSet.toArray)
[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for counting GroupWindows.
[FLINK-6583] [table] Add state cleanup for counting GroupWindows. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36cac0fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36cac0fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36cac0fb Branch: refs/heads/release-1.3 Commit: 36cac0fb64ade6debc66f06e00d4184cdc0fba5f Parents: 056d955 Author: sunjincheng121Authored: Tue May 16 11:58:37 2017 +0800 Committer: Fabian Hueske Committed: Wed May 17 15:29:43 2017 +0200 -- .../DataStreamGroupWindowAggregate.scala| 32 +++- .../triggers/StateCleaningCountTrigger.scala| 136 + .../table/GroupWindowAggregationsITCase.scala | 10 +- .../StateCleaningCountTriggerHarnessTest.scala | 147 +++ 4 files changed, 316 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/36cac0fb/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c158579..1ac013a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger +import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory @@ -40,7 +41,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger +import org.slf4j.LoggerFactory class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate( grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) + override def deriveRowType(): RelDataType = schema.logicalType override def needsUpdatesAsRetraction = true @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } +val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false +} + +if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( +"No state retention interval configured for a query which accumulates state. " + +"Please provide a query configuration with valid retention interval to prevent excessive " + +"state size. You may specify a retention time of 0 to not clean up the state.") +} + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) val aggString = aggregationToString( @@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = -createKeyedWindowedStream(window, keyedStream) +createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate( physicalNamedProperties) val windowedStream = -
[3/3] flink git commit: [FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth.
[FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98f4fad9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98f4fad9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98f4fad9 Branch: refs/heads/release-1.3 Commit: 98f4fad93263b05f0e55562ddd81385430546225 Parents: 36cac0f Author: Fabian HueskeAuthored: Mon May 15 21:41:51 2017 +0200 Committer: Fabian Hueske Committed: Wed May 17 15:29:46 2017 +0200 -- .../apache/flink/api/common/typeutils/base/ListSerializer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/98f4fad9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java index aa9808e..1f271fe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -82,7 +82,7 @@ public final class ListSerializer extends TypeSerializer { @Override public TypeSerializer
duplicate() { TypeSerializer duplicateElement = elementSerializer.duplicate(); - return duplicateElement == elementSerializer ? this : new ListSerializer(duplicateElement); + return duplicateElement == elementSerializer ? this : new ListSerializer<>(duplicateElement); } @Override @@ -129,7 +129,8 @@ public final class ListSerializer extends TypeSerializer
{ @Override public List deserialize(DataInputView source) throws IOException { final int size = source.readInt(); - final List list = new ArrayList<>(size); + // create new list with (size + 1) capacity to prevent expensive growth when a single element is added + final List list = new ArrayList<>(size + 1); for (int i = 0; i < size; i++) { list.add(elementSerializer.deserialize(source)); }
[1/3] flink git commit: [FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth.
Repository: flink Updated Branches: refs/heads/master b6afc06ab -> 2bfead7d9 [FLINK-6589] [core] Deserialize ArrayList with capacity of size+1 to prevent growth. This closes #3912. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bfead7d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bfead7d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bfead7d Branch: refs/heads/master Commit: 2bfead7d9bef51713ed203fa7979f71f23525733 Parents: d85d969 Author: Fabian HueskeAuthored: Mon May 15 21:41:51 2017 +0200 Committer: Fabian Hueske Committed: Wed May 17 15:24:23 2017 +0200 -- .../apache/flink/api/common/typeutils/base/ListSerializer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2bfead7d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java index aa9808e..1f271fe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -82,7 +82,7 @@ public final class ListSerializer extends TypeSerializer { @Override public TypeSerializer
duplicate() { TypeSerializer duplicateElement = elementSerializer.duplicate(); - return duplicateElement == elementSerializer ? this : new ListSerializer(duplicateElement); + return duplicateElement == elementSerializer ? this : new ListSerializer<>(duplicateElement); } @Override @@ -129,7 +129,8 @@ public final class ListSerializer extends TypeSerializer
{ @Override public List deserialize(DataInputView source) throws IOException { final int size = source.readInt(); - final List list = new ArrayList<>(size); + // create new list with (size + 1) capacity to prevent expensive growth when a single element is added + final List list = new ArrayList<>(size + 1); for (int i = 0; i < size; i++) { list.add(elementSerializer.deserialize(source)); }
[2/3] flink git commit: [FLINK-6583] [table] Add state cleanup for counting GroupWindows.
[FLINK-6583] [table] Add state cleanup for counting GroupWindows. This closes #3919. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d85d9693 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d85d9693 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d85d9693 Branch: refs/heads/master Commit: d85d969334e89d83aec60f9bb3d2c69a4701eb54 Parents: 64d3ce8 Author: sunjincheng121Authored: Tue May 16 11:58:37 2017 +0800 Committer: Fabian Hueske Committed: Wed May 17 15:24:23 2017 +0200 -- .../DataStreamGroupWindowAggregate.scala| 32 +++- .../triggers/StateCleaningCountTrigger.scala| 136 + .../table/GroupWindowAggregationsITCase.scala | 10 +- .../StateCleaningCountTriggerHarnessTest.scala | 147 +++ 4 files changed, 316 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d85d9693/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index c158579..1ac013a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -25,7 +25,8 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ -import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger +import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, Window => DataStreamWindow} import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory @@ -40,7 +41,8 @@ import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger +import org.slf4j.LoggerFactory class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -54,6 +56,8 @@ class DataStreamGroupWindowAggregate( grouping: Array[Int]) extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel { + private val LOG = LoggerFactory.getLogger(this.getClass) + override def deriveRowType(): RelDataType = schema.logicalType override def needsUpdatesAsRetraction = true @@ -131,6 +135,19 @@ class DataStreamGroupWindowAggregate( "non-windowed GroupBy aggregation.") } +val isCountWindow = window match { + case TumblingGroupWindow(_, _, size) if isRowCountLiteral(size) => true + case SlidingGroupWindow(_, _, size, _) if isRowCountLiteral(size) => true + case _ => false +} + +if (isCountWindow && grouping.length > 0 && queryConfig.getMinIdleStateRetentionTime < 0) { + LOG.warn( +"No state retention interval configured for a query which accumulates state. " + +"Please provide a query configuration with valid retention interval to prevent excessive " + +"state size. You may specify a retention time of 0 to not clean up the state.") +} + val outRowType = CRowTypeInfo(schema.physicalTypeInfo) val aggString = aggregationToString( @@ -167,7 +184,7 @@ class DataStreamGroupWindowAggregate( val keyedStream = inputDS.keyBy(physicalGrouping: _*) val windowedStream = -createKeyedWindowedStream(window, keyedStream) +createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = @@ -192,7 +209,7 @@ class DataStreamGroupWindowAggregate( physicalNamedProperties) val windowedStream = -
flink git commit: [hotfix] Add configuration notice to HistryServer overview
Repository: flink Updated Branches: refs/heads/release-1.3 849dd9d85 -> 7a045f204 [hotfix] Add configuration notice to HistryServer overview Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a045f20 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a045f20 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a045f20 Branch: refs/heads/release-1.3 Commit: 7a045f20432cbaf1dcfa70206004fbee3a91d875 Parents: 849dd9d Author: twalthrAuthored: Wed May 17 15:10:21 2017 +0200 Committer: twalthr Committed: Wed May 17 15:11:44 2017 +0200 -- docs/monitoring/historyserver.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7a045f20/docs/monitoring/historyserver.md -- diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md index fd957f2..f5f0019 100644 --- a/docs/monitoring/historyserver.md +++ b/docs/monitoring/historyserver.md @@ -33,7 +33,7 @@ Furthermore, it exposes a REST API that accepts HTTP requests and responds with The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. -You start and stop the HistoryServer via its corresponding startup script: +After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: ```sh # Start or stop the HistoryServer
flink git commit: [hotfix] Add configuration notice to HistryServer overview
Repository: flink Updated Branches: refs/heads/master 00ce3f1b1 -> b6afc06ab [hotfix] Add configuration notice to HistryServer overview Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6afc06a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6afc06a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6afc06a Branch: refs/heads/master Commit: b6afc06ab1b5c227254941a512912f830da62b5d Parents: 00ce3f1 Author: twalthrAuthored: Wed May 17 15:10:21 2017 +0200 Committer: twalthr Committed: Wed May 17 15:10:21 2017 +0200 -- docs/monitoring/historyserver.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b6afc06a/docs/monitoring/historyserver.md -- diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md index fd957f2..f5f0019 100644 --- a/docs/monitoring/historyserver.md +++ b/docs/monitoring/historyserver.md @@ -33,7 +33,7 @@ Furthermore, it exposes a REST API that accepts HTTP requests and responds with The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. -You start and stop the HistoryServer via its corresponding startup script: +After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: ```sh # Start or stop the HistoryServer
[2/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.
[FLINK-6371] [cep] NFA return matched patterns as Map. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa64a60f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa64a60f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa64a60f Branch: refs/heads/release-1.3 Commit: fa64a60ff9229cd1c7723d95b8a1bf1a1eb2bd63 Parents: fe1316b Author: kl0u Authored: Fri May 5 13:55:07 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:23 2017 +0200 -- .../org/apache/flink/cep/CEPLambdaTest.java | 11 +- .../apache/flink/cep/scala/PatternStream.scala | 31 +- ...StreamScalaJavaAPIInteroperabilityTest.scala | 33 +- .../flink/cep/PatternFlatSelectFunction.java| 3 +- .../flink/cep/PatternFlatTimeoutFunction.java | 3 +- .../apache/flink/cep/PatternSelectFunction.java | 3 +- .../org/apache/flink/cep/PatternStream.java | 29 +- .../flink/cep/PatternTimeoutFunction.java | 3 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 109 +--- .../org/apache/flink/cep/nfa/SharedBuffer.java | 10 +- .../flink/cep/operator/CEPOperatorUtils.java| 19 +- .../cep/operator/KeyedCEPPatternOperator.java | 17 +- .../TimeoutKeyedCEPPatternOperator.java | 23 +- .../java/org/apache/flink/cep/CEPITCase.java| 69 +-- .../org/apache/flink/cep/nfa/NFAITCase.java | 608 +++ .../java/org/apache/flink/cep/nfa/NFATest.java | 62 +- .../apache/flink/cep/nfa/SharedBufferTest.java | 17 +- .../flink/cep/nfa/compiler/NFACompilerTest.java | 1 - .../cep/operator/CEPFrom12MigrationTest.java| 57 +- .../cep/operator/CEPMigration11to13Test.java| 21 +- .../flink/cep/operator/CEPOperatorTest.java | 41 +- .../flink/cep/operator/CEPRescalingTest.java| 31 +- 22 files changed, 474 insertions(+), 727 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java -- diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java index 5957158..03fb3c6 100644 --- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java +++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java @@ -24,10 +24,13 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; +import org.junit.Ignore; import org.junit.Test; +import java.util.List; import java.util.Map; import static org.junit.Assert.*; @@ -41,6 +44,7 @@ public class CEPLambdaTest extends TestLogger { * Tests that a Java8 lambda can be passed as a CEP select function */ @Test + @Ignore public void testLambdaSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -59,16 +63,17 @@ public class CEPLambdaTest extends TestLogger { PatternStream patternStream = new PatternStream<>(inputStream, dummyPattern); DataStream result = patternStream.select( - map -> new EventB() + (Map map) -> new EventB() ); assertEquals(outputTypeInformation, result.getType()); } /** -* Tests that a Java8 labmda can be passed as a CEP flat select function +* Tests that a Java8 lambda can be passed as a CEP flat select function */ @Test + @Ignore public void testLambdaFlatSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -86,7 +91,7 @@ public class CEPLambdaTest extends TestLogger { PatternStream patternStream = new PatternStream<>(inputStream, dummyPattern); DataStream result = patternStream.flatSelect( - (map, collector) -> collector.collect(new EventB()) + (Map map, Collector collector) -> collector.collect(new EventB()) ); assertEquals(outputTypeInformation, result.getType());
[6/9] flink git commit: [FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.
[FLINK-6578] [cep] Fix self-loop handling in SharedBuffer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36df9019 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36df9019 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36df9019 Branch: refs/heads/release-1.3 Commit: 36df90196fcbb713175466d4b31574d570890262 Parents: 4560d56 Author: kkloudasAuthored: Mon May 15 14:33:09 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:26 2017 +0200 -- .../apache/flink/cep/nfa/ComputationState.java | 15 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 34 +- .../org/apache/flink/cep/nfa/SharedBuffer.java | 117 +++--- .../java/org/apache/flink/cep/nfa/State.java| 6 +- .../apache/flink/cep/nfa/StateTransition.java | 21 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 364 +++ .../apache/flink/cep/nfa/SharedBufferTest.java | 78 ++-- .../cep/operator/CEPFrom12MigrationTest.java| 3 + .../cep/operator/CEPMigration11to13Test.java| 3 + .../flink/cep/operator/CEPOperatorTest.java | 201 ++ 10 files changed, 728 insertions(+), 114 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/36df9019/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 08b9b78..44f8f39 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -40,6 +40,8 @@ public class ComputationState { // the last taken event private final T event; + private final int counter; + // timestamp of the last taken event private final long timestamp; @@ -58,11 +60,13 @@ public class ComputationState { final State currentState, final State previousState, final T event, + final int counter, final long timestamp, final DeweyNumber version, final long startTimestamp) { this.state = currentState; this.event = event; + this.counter = counter; this.timestamp = timestamp; this.version = version; this.startTimestamp = startTimestamp; @@ -70,6 +74,10 @@ public class ComputationState { this.conditionContext = new ConditionContext(nfa, this); } + public int getCounter() { + return counter; + } + public ConditionContext getConditionContext() { return conditionContext; } @@ -108,12 +116,12 @@ public class ComputationState { public static ComputationState createStartState(final NFA nfa, final State state) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(nfa, state, null, null, -1L, new DeweyNumber(1), -1L); + return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L); } public static ComputationState createStartState(final NFA nfa, final State state, final DeweyNumber version) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(nfa, state, null, null, -1L, version, -1L); + return new ComputationState<>(nfa, state, null, null, 0, -1L, version, -1L); } public static ComputationState createState( @@ -121,10 +129,11 @@ public class ComputationState { final State currentState, final State previousState, final T event, + final int counter, final long timestamp, final DeweyNumber version, final long startTimestamp) { - return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp); + return new ComputationState<>(nfa, currentState, previousState, event, counter, timestamp, version, startTimestamp); } public boolean isStopState() { http://git-wip-us.apache.org/repos/asf/flink/blob/36df9019/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git
[4/9] flink git commit: [FLINK-6255] [cep] Remove PatternStream.getSideOutput().
[FLINK-6255] [cep] Remove PatternStream.getSideOutput(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4560d56c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4560d56c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4560d56c Branch: refs/heads/release-1.3 Commit: 4560d56c6fdface683116dc2db5d7a4942a8b6e3 Parents: 4f14e53 Author: kl0uAuthored: Fri May 12 16:01:38 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:25 2017 +0200 -- docs/dev/libs/cep.md| 41 +--- .../apache/flink/cep/scala/PatternStream.scala | 35 +-- .../org/apache/flink/cep/PatternStream.java | 55 +-- .../AbstractKeyedCEPPatternOperator.java| 24 - .../flink/cep/operator/CEPOperatorUtils.java| 9 +- .../cep/operator/KeyedCEPPatternOperator.java | 4 +- .../TimeoutKeyedCEPPatternOperator.java | 4 +- .../java/org/apache/flink/cep/CEPITCase.java| 98 +--- .../cep/operator/CEPFrom12MigrationTest.java| 6 -- .../cep/operator/CEPMigration11to13Test.java| 2 - .../flink/cep/operator/CEPOperatorTest.java | 2 - .../flink/cep/operator/CEPRescalingTest.java| 1 - 12 files changed, 13 insertions(+), 268 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/docs/dev/libs/cep.md -- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index b379615..58e1a0a 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -806,46 +806,7 @@ in event time. To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed but they can be redirected to a [side output] -({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them. - -To access the stream of late elements, you first need to specify that you want to get the late data using -`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do -so, the late elements will be silently dropped. Then, you can get the side-output stream using the -`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in -the `.sideOutputLateData(OutputTag)`: - - - -{% highlight java %} -final OutputTag lateOutputTag = new OutputTag("late-data"){}; - -PatternStream patternStream = CEP.pattern(...) -.sideOutputLateData(lateOutputTag); - -// main output with matches -DataStream result = patternStream.select(...) - -// side output containing the late events -DataStream lateStream = patternStream.getSideOutput(lateOutputTag); -{% endhighlight %} - - - -{% highlight scala %} -val lateOutputTag = OutputTag[T]("late-data") - -val patternStream: PatternStream[T] = CEP.pattern(...) -.sideOutputLateData(lateOutputTag) - -// main output with matches -val result = patternStream.select(...) - -// side output containing the late events -val lateStream = patternStream.getSideOutput(lateOutputTag) -{% endhighlight %} - - +seen watermark. Late elements are not further processed. ## Examples http://git-wip-us.apache.org/repos/asf/flink/blob/4560d56c/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala -- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index d4bc28c..e71439c 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.cep.pattern.{Pattern => JPattern} import org.apache.flink.streaming.api.scala.{asScalaStream, _} -import org.apache.flink.util.{Collector, OutputTag} +import org.apache.flink.util.Collector import org.apache.flink.types.{Either => FEither} import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} import java.lang.{Long => JLong} -import org.apache.flink.annotation.PublicEvolving import org.apache.flink.cep.operator.CEPOperatorUtils import org.apache.flink.cep.scala.pattern.Pattern
[9/9] flink git commit: [FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.
[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849dd9d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849dd9d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849dd9d8 Branch: refs/heads/release-1.3 Commit: 849dd9d8588ec8b1971c2c2b3f0a07f87715741b Parents: d80af81 Author: Dawid WysakowiczAuthored: Wed May 17 09:16:08 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:28 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++- .../org/apache/flink/cep/nfa/NFAITCase.java | 82 2 files changed, 89 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/849dd9d8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ab5cd8e..a977a7f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -418,6 +418,7 @@ public class NFA implements Serializable { final List edges = outgoingEdges.getEdges(); int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); + int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); final List resultingComputationStates = new ArrayList<>(); for (StateTransition edge : edges) { @@ -433,7 +434,9 @@ public class NFA implements Serializable { version = computationState.getVersion().increase(toIncrease); } else { //IGNORE after PROCEED - version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage(); + version = computationState.getVersion() + .increase(totalTakeToSkip + ignoreBranchesToVisit) + .addStage(); ignoreBranchesToVisit--; } @@ -457,8 +460,8 @@ public class NFA implements Serializable { final T previousEvent = computationState.getEvent(); - final DeweyNumber currentVersion = computationState.getVersion(); - final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + final DeweyNumber currentVersion = computationState.getVersion().increase(takeBranchesToVisit); + final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage(); takeBranchesToVisit--; final int counter; @@ -573,7 +576,7 @@ public class NFA implements Serializable { } private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { - return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1; + return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches); } private OutgoingEdges createDecisionGraph(ComputationState computationState, T event) { http://git-wip-us.apache.org/repos/asf/flink/blob/849dd9d8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 012e112..d00bbb7 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger { } @Test + public void testMultipleTakesVersionCollision() { + List inputEvents
[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 88a5703..824df2d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -32,9 +32,9 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.Ignore; import org.junit.Test; import java.net.URL; @@ -57,7 +57,6 @@ public class CEPMigration11to13Test { } @Test - @Ignore public void testKeyedCEPOperatorMigratation() throws Exception { KeySelectorkeySelector = new KeySelector () { @@ -136,11 +135,58 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord(startEvent1, 21)); + harness.processElement(new StreamRecord(middleEvent1, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap1 = (Map ) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent1, patternMap1.get("end").get(0)); + harness.close(); } @Test - @Ignore public void testNonKeyedCEPFunctionMigration() throws Exception { final Event startEvent = new Event(42, "start", 1.0); @@ -191,7 +237,7 @@ public class CEPMigration11to13Test { harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(endEvent, 5)); - harness.processWatermark(new Watermark(Long.MAX_VALUE)); + harness.processWatermark(new Watermark(20)); ConcurrentLinkedQueue result = harness.getOutput(); @@ -210,6 +256,54 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); +
[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
[FLINK-6604] [cep] Remove java serialization from the library. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80af819 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80af819 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80af819 Branch: refs/heads/release-1.3 Commit: d80af81972ba5adf291d891881aee26b97ec7a60 Parents: 34a6020 Author: kkloudasAuthored: Tue May 16 17:07:29 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:27 2017 +0200 -- .../org/apache/flink/cep/nfa/DeweyNumber.java | 98 ++- .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 596 ++- .../java/org/apache/flink/cep/nfa/State.java| 2 + .../flink/cep/nfa/compiler/NFACompiler.java | 81 ++- .../AbstractKeyedCEPPatternOperator.java| 16 +- .../java/org/apache/flink/cep/nfa/NFATest.java | 182 -- .../apache/flink/cep/nfa/SharedBufferTest.java | 14 +- .../cep/operator/CEPFrom12MigrationTest.java| 99 ++- .../cep/operator/CEPMigration11to13Test.java| 102 +++- .../flink/cep/operator/CEPOperatorTest.java | 110 +++- 11 files changed, 1499 insertions(+), 318 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index fd3fafa..3827956 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -18,6 +18,13 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable { deweyNumber = new int[]{start}; } - protected DeweyNumber(int[] deweyNumber) { - this.deweyNumber = deweyNumber; - } - public DeweyNumber(DeweyNumber number) { this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); } + private DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable { return new DeweyNumber(deweyNumber); } } + + /** +* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number. +*/ + public static class DeweyNumberSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = -5086792497034943656L; + + private final IntSerializer elemSerializer = IntSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DeweyNumber createInstance() { + return new DeweyNumber(1); + } + + @Override + public DeweyNumber copy(DeweyNumber from) { + return new DeweyNumber(from); + } + + @Override + public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DeweyNumber record, DataOutputView target) throws IOException { + final int size = record.length(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.serialize(record.deweyNumber[i], target); + } + } + + @Override + public DeweyNumber deserialize(DataInputView source) throws IOException { + final
[1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.
Repository: flink Updated Branches: refs/heads/release-1.3 fe1316b33 -> 849dd9d85 http://git-wip-us.apache.org/repos/asf/flink/blob/fa64a60f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 2cc67e5..46e2fd4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; @@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List
[3/9] flink git commit: [FLINK-6536] [cep] Improve error message in SharedBuffer::put().
[FLINK-6536] [cep] Improve error message in SharedBuffer::put(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f14e53b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f14e53b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f14e53b Branch: refs/heads/release-1.3 Commit: 4f14e53b877eab204d4d970d29f886d8fcc0034b Parents: fa64a60 Author: kl0uAuthored: Thu May 11 11:39:00 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:24 2017 +0200 -- .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4f14e53b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 418bd4a..decf577 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -97,10 +97,11 @@ public class SharedBuffer implements Serializable { // sanity check whether we've found the previous element if (previousSharedBufferEntry == null && previousValue != null) { - throw new IllegalStateException("Could not find previous shared buffer entry with " + + throw new IllegalStateException("Could not find previous entry with " + "key: " + previousKey + ", value: " + previousValue + " and timestamp: " + - previousTimestamp + ". This can indicate that the element belonging to the previous " + - "relation has been already pruned, even though you expect it to be still there."); + previousTimestamp + ". This can indicate that either you did not implement " + + "the equals() and hashCode() methods of your input elements properly or that " + + "the element belonging to that entry has been already pruned."); } put(key, value, timestamp, previousSharedBufferEntry, version);
[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in operator.
[hotfix] [cep] Remove unused keySelector in operator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34a6020f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34a6020f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34a6020f Branch: refs/heads/release-1.3 Commit: 34a6020ff6d51cd8148366677fadb9317cfaa153 Parents: 36df901 Author: kkloudasAuthored: Mon May 15 14:49:00 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:26 2017 +0200 -- .../cep/operator/AbstractKeyedCEPPatternOperator.java | 6 -- .../org/apache/flink/cep/operator/CEPOperatorUtils.java | 6 -- .../flink/cep/operator/KeyedCEPPatternOperator.java | 4 +--- .../cep/operator/TimeoutKeyedCEPPatternOperator.java| 4 +--- .../flink/cep/operator/CEPFrom12MigrationTest.java | 6 -- .../flink/cep/operator/CEPMigration11to13Test.java | 2 -- .../org/apache/flink/cep/operator/CEPOperatorTest.java | 12 .../org/apache/flink/cep/operator/CEPRescalingTest.java | 1 - 8 files changed, 6 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 7068bc4..bac21b3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.fs.FSDataInputStream; @@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator private final TypeSerializer inputSerializer; - // necessary to extract the key from the input elements - private final KeySelector keySelector; - // necessary to serialize the set of seen keys private final TypeSerializer keySerializer; @@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator public AbstractKeyedCEPPatternOperator( final TypeSerializer inputSerializer, final boolean isProcessingTime, - final KeySelector keySelector, final TypeSerializer keySerializer, final NFACompiler.NFAFactory nfaFactory, final boolean migratingFromOldKeyedOperator) { this.inputSerializer = Preconditions.checkNotNull(inputSerializer); this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); - this.keySelector = Preconditions.checkNotNull(keySelector); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.nfaFactory = Preconditions.checkNotNull(nfaFactory); http://git-wip-us.apache.org/repos/asf/flink/blob/34a6020f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 08424a4..e7b7e65 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -63,7 +63,6 @@ public class CEPOperatorUtils { // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams KeyedStream keyedStream= (KeyedStream ) inputStream; - KeySelector keySelector = keyedStream.getKeySelector(); TypeSerializer keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
[FLINK-6604] [cep] Remove java serialization from the library. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e Branch: refs/heads/master Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47 Parents: f7ebcb0 Author: kkloudasAuthored: Tue May 16 17:07:29 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:34 2017 +0200 -- .../org/apache/flink/cep/nfa/DeweyNumber.java | 98 ++- .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 596 ++- .../java/org/apache/flink/cep/nfa/State.java| 2 + .../flink/cep/nfa/compiler/NFACompiler.java | 81 ++- .../AbstractKeyedCEPPatternOperator.java| 16 +- .../java/org/apache/flink/cep/nfa/NFATest.java | 182 -- .../apache/flink/cep/nfa/SharedBufferTest.java | 14 +- .../cep/operator/CEPFrom12MigrationTest.java| 99 ++- .../cep/operator/CEPMigration11to13Test.java| 102 +++- .../flink/cep/operator/CEPOperatorTest.java | 110 +++- 11 files changed, 1499 insertions(+), 318 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index fd3fafa..3827956 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -18,6 +18,13 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable { deweyNumber = new int[]{start}; } - protected DeweyNumber(int[] deweyNumber) { - this.deweyNumber = deweyNumber; - } - public DeweyNumber(DeweyNumber number) { this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); } + private DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable { return new DeweyNumber(deweyNumber); } } + + /** +* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number. +*/ + public static class DeweyNumberSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = -5086792497034943656L; + + private final IntSerializer elemSerializer = IntSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DeweyNumber createInstance() { + return new DeweyNumber(1); + } + + @Override + public DeweyNumber copy(DeweyNumber from) { + return new DeweyNumber(from); + } + + @Override + public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DeweyNumber record, DataOutputView target) throws IOException { + final int size = record.length(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.serialize(record.deweyNumber[i], target); + } + } + + @Override + public DeweyNumber deserialize(DataInputView source) throws IOException { + final int
[9/9] flink git commit: [FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs.
[FLINK-6609] [cep] Fix wrong version assignment with multiple TAKEs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/00ce3f1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/00ce3f1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/00ce3f1b Branch: refs/heads/master Commit: 00ce3f1b12c7d7bf996d5f91bf006f0e18a719e7 Parents: 7a54d05 Author: Dawid WysakowiczAuthored: Wed May 17 09:16:08 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:35 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 11 ++- .../org/apache/flink/cep/nfa/NFAITCase.java | 82 2 files changed, 89 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ab5cd8e..a977a7f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -418,6 +418,7 @@ public class NFA implements Serializable { final List edges = outgoingEdges.getEdges(); int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); + int totalTakeToSkip = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); final List resultingComputationStates = new ArrayList<>(); for (StateTransition edge : edges) { @@ -433,7 +434,9 @@ public class NFA implements Serializable { version = computationState.getVersion().increase(toIncrease); } else { //IGNORE after PROCEED - version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage(); + version = computationState.getVersion() + .increase(totalTakeToSkip + ignoreBranchesToVisit) + .addStage(); ignoreBranchesToVisit--; } @@ -457,8 +460,8 @@ public class NFA implements Serializable { final T previousEvent = computationState.getEvent(); - final DeweyNumber currentVersion = computationState.getVersion(); - final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + final DeweyNumber currentVersion = computationState.getVersion().increase(takeBranchesToVisit); + final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage(); takeBranchesToVisit--; final int counter; @@ -573,7 +576,7 @@ public class NFA implements Serializable { } private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { - return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1; + return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches); } private OutgoingEdges createDecisionGraph(ComputationState computationState, T event) { http://git-wip-us.apache.org/repos/asf/flink/blob/00ce3f1b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 012e112..d00bbb7 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -3974,6 +3974,88 @@ public class NFAITCase extends TestLogger { } @Test + public void testMultipleTakesVersionCollision() { + List inputEvents =
[4/9] flink git commit: [FLINK-6536] [cep] Improve error message in SharedBuffer::put().
[FLINK-6536] [cep] Improve error message in SharedBuffer::put(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/02ea418f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/02ea418f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/02ea418f Branch: refs/heads/master Commit: 02ea418fb307876e1b957cad6be619a4d035d829 Parents: ae9c9d0 Author: kl0uAuthored: Thu May 11 11:39:00 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:32 2017 +0200 -- .../src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/02ea418f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 418bd4a..decf577 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -97,10 +97,11 @@ public class SharedBuffer implements Serializable { // sanity check whether we've found the previous element if (previousSharedBufferEntry == null && previousValue != null) { - throw new IllegalStateException("Could not find previous shared buffer entry with " + + throw new IllegalStateException("Could not find previous entry with " + "key: " + previousKey + ", value: " + previousValue + " and timestamp: " + - previousTimestamp + ". This can indicate that the element belonging to the previous " + - "relation has been already pruned, even though you expect it to be still there."); + previousTimestamp + ". This can indicate that either you did not implement " + + "the equals() and hashCode() methods of your input elements properly or that " + + "the element belonging to that entry has been already pruned."); } put(key, value, timestamp, previousSharedBufferEntry, version);
[7/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index 88a5703..824df2d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -32,9 +32,9 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.junit.Ignore; import org.junit.Test; import java.net.URL; @@ -57,7 +57,6 @@ public class CEPMigration11to13Test { } @Test - @Ignore public void testKeyedCEPOperatorMigratation() throws Exception { KeySelectorkeySelector = new KeySelector () { @@ -136,11 +135,58 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); + // and now go for a checkpoint with the new serializers + + final Event startEvent1 = new Event(42, "start", 2.0); + final SubEvent middleEvent1 = new SubEvent(42, "foo", 1.0, 11.0); + final Event endEvent1 = new Event(42, "end", 2.0); + + harness.processElement(new StreamRecord(startEvent1, 21)); + harness.processElement(new StreamRecord(middleEvent1, 23)); + + // simulate snapshot/restore with some elements in internal sorting queue + OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + harness.close(); + + harness = new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + IntSerializer.INSTANCE, + new NFAFactory(), + true), + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + + harness.setup(); + harness.initializeState(snapshot); + harness.open(); + + harness.processElement(new StreamRecord<>(endEvent1, 25)); + + harness.processWatermark(new Watermark(50)); + + result = harness.getOutput(); + + // watermark and the result + assertEquals(2, result.size()); + + Object resultObject1 = result.poll(); + assertTrue(resultObject1 instanceof StreamRecord); + StreamRecord resultRecord1 = (StreamRecord) resultObject1; + assertTrue(resultRecord1.getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map patternMap1 = (Map ) resultRecord1.getValue(); + + assertEquals(startEvent1, patternMap1.get("start").get(0)); + assertEquals(middleEvent1, patternMap1.get("middle").get(0)); + assertEquals(endEvent1, patternMap1.get("end").get(0)); + harness.close(); } @Test - @Ignore public void testNonKeyedCEPFunctionMigration() throws Exception { final Event startEvent = new Event(42, "start", 1.0); @@ -191,7 +237,7 @@ public class CEPMigration11to13Test { harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4)); harness.processElement(new StreamRecord<>(endEvent, 5)); - harness.processWatermark(new Watermark(Long.MAX_VALUE)); + harness.processWatermark(new Watermark(20)); ConcurrentLinkedQueue result = harness.getOutput(); @@ -210,6 +256,54 @@ public class CEPMigration11to13Test { assertEquals(middleEvent, patternMap.get("middle").get(0)); assertEquals(endEvent, patternMap.get("end").get(0)); +
[5/9] flink git commit: [hotfix] [cep] Remove unused keySelector in operator.
[hotfix] [cep] Remove unused keySelector in operator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7ebcb07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7ebcb07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7ebcb07 Branch: refs/heads/master Commit: f7ebcb07edecc06f523e4f46dfaa3ec10d1e90c8 Parents: 8e4db42 Author: kkloudasAuthored: Mon May 15 14:49:00 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:33 2017 +0200 -- .../cep/operator/AbstractKeyedCEPPatternOperator.java | 6 -- .../org/apache/flink/cep/operator/CEPOperatorUtils.java | 6 -- .../flink/cep/operator/KeyedCEPPatternOperator.java | 4 +--- .../cep/operator/TimeoutKeyedCEPPatternOperator.java| 4 +--- .../flink/cep/operator/CEPFrom12MigrationTest.java | 6 -- .../flink/cep/operator/CEPMigration11to13Test.java | 2 -- .../org/apache/flink/cep/operator/CEPOperatorTest.java | 12 .../org/apache/flink/cep/operator/CEPRescalingTest.java | 1 - 8 files changed, 6 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 7068bc4..bac21b3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.fs.FSDataInputStream; @@ -78,9 +77,6 @@ public abstract class AbstractKeyedCEPPatternOperator private final TypeSerializer inputSerializer; - // necessary to extract the key from the input elements - private final KeySelector keySelector; - // necessary to serialize the set of seen keys private final TypeSerializer keySerializer; @@ -112,14 +108,12 @@ public abstract class AbstractKeyedCEPPatternOperator public AbstractKeyedCEPPatternOperator( final TypeSerializer inputSerializer, final boolean isProcessingTime, - final KeySelector keySelector, final TypeSerializer keySerializer, final NFACompiler.NFAFactory nfaFactory, final boolean migratingFromOldKeyedOperator) { this.inputSerializer = Preconditions.checkNotNull(inputSerializer); this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime); - this.keySelector = Preconditions.checkNotNull(keySelector); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.nfaFactory = Preconditions.checkNotNull(nfaFactory); http://git-wip-us.apache.org/repos/asf/flink/blob/f7ebcb07/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index 08424a4..e7b7e65 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -63,7 +63,6 @@ public class CEPOperatorUtils { // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams KeyedStream keyedStream= (KeyedStream ) inputStream; - KeySelector keySelector = keyedStream.getKeySelector(); TypeSerializer keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
[1/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.
Repository: flink Updated Branches: refs/heads/master 9244106b3 -> 00ce3f1b1 http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java index 2cc67e5..46e2fd4 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.cep.nfa; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.Event; @@ -156,22 +155,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - List> resultingPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent: inputEvents) { - Collection > patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - resultingPatterns.addAll(patterns); - } - - assertEquals(1, resultingPatterns.size()); - Map patternMap = resultingPatterns.get(0); + List resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(startEvent, patternMap.get("start")); - assertEquals(middleEvent, patternMap.get("middle")); - assertEquals(endEvent, patternMap.get("end")); + compareMaps(resultingPatterns, Lists.
newArrayList( + Lists.newArrayList(startEvent, middleEvent, endEvent) + )); } @Test @@ -202,24 +190,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set
resultingPatterns = new HashSet<>(); - List allPatterns = new ArrayList<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection > patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - allPatterns.add(foundPattern.values()); - } - } + List resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(1, allPatterns.size()); - assertEquals(Sets.
newHashSet( - Sets.newHashSet(middleEvent1, end) - ), resultingPatterns); + compareMaps(resultingPatterns, Lists. newArrayList( + Lists.newArrayList(middleEvent1, end) + )); } @Test @@ -252,19 +227,9 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - Set
resultingPatterns = new HashSet<>(); - - for (StreamRecord inputEvent : inputEvents) { - Collection > patterns = nfa.process( - inputEvent.getValue(), - inputEvent.getTimestamp()).f0; - - for (Map foundPattern : patterns) { - resultingPatterns.add(new HashSet<>(foundPattern.values())); - } - } + List resultingPatterns = feedNFA(inputEvents, nfa); - assertEquals(Sets.newHashSet(), resultingPatterns); + compareMaps(resultingPatterns, Lists.
newArrayList()); } /** @@ -274,7 +239,6 @@ public class NFAITCase extends TestLogger { @Test public void testSimplePatternWithTimeWindowNFA() { List
events = new ArrayList<>(); - List > resultingPatterns = new ArrayList<>(); final Event startEvent; final Event middleEvent; @@ -313,21 +277,11 @@ public class NFAITCase extends TestLogger { NFA nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false); - for (StreamRecord event: events) { -
[6/9] flink git commit: [FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.
[FLINK-6578] [cep] Fix self-loop handling in SharedBuffer. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4db423 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4db423 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4db423 Branch: refs/heads/master Commit: 8e4db423b79580de0cf66e905f8a66c12ea3748a Parents: 05ad87f Author: kkloudasAuthored: Mon May 15 14:33:09 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:33 2017 +0200 -- .../apache/flink/cep/nfa/ComputationState.java | 15 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 34 +- .../org/apache/flink/cep/nfa/SharedBuffer.java | 117 +++--- .../java/org/apache/flink/cep/nfa/State.java| 6 +- .../apache/flink/cep/nfa/StateTransition.java | 21 +- .../org/apache/flink/cep/nfa/NFAITCase.java | 364 +++ .../apache/flink/cep/nfa/SharedBufferTest.java | 78 ++-- .../cep/operator/CEPFrom12MigrationTest.java| 3 + .../cep/operator/CEPMigration11to13Test.java| 3 + .../flink/cep/operator/CEPOperatorTest.java | 201 ++ 10 files changed, 728 insertions(+), 114 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8e4db423/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java index 08b9b78..44f8f39 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -40,6 +40,8 @@ public class ComputationState { // the last taken event private final T event; + private final int counter; + // timestamp of the last taken event private final long timestamp; @@ -58,11 +60,13 @@ public class ComputationState { final State currentState, final State previousState, final T event, + final int counter, final long timestamp, final DeweyNumber version, final long startTimestamp) { this.state = currentState; this.event = event; + this.counter = counter; this.timestamp = timestamp; this.version = version; this.startTimestamp = startTimestamp; @@ -70,6 +74,10 @@ public class ComputationState { this.conditionContext = new ConditionContext(nfa, this); } + public int getCounter() { + return counter; + } + public ConditionContext getConditionContext() { return conditionContext; } @@ -108,12 +116,12 @@ public class ComputationState { public static ComputationState createStartState(final NFA nfa, final State state) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(nfa, state, null, null, -1L, new DeweyNumber(1), -1L); + return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L); } public static ComputationState createStartState(final NFA nfa, final State state, final DeweyNumber version) { Preconditions.checkArgument(state.isStart()); - return new ComputationState<>(nfa, state, null, null, -1L, version, -1L); + return new ComputationState<>(nfa, state, null, null, 0, -1L, version, -1L); } public static ComputationState createState( @@ -121,10 +129,11 @@ public class ComputationState { final State currentState, final State previousState, final T event, + final int counter, final long timestamp, final DeweyNumber version, final long startTimestamp) { - return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp); + return new ComputationState<>(nfa, currentState, previousState, event, counter, timestamp, version, startTimestamp); } public boolean isStopState() { http://git-wip-us.apache.org/repos/asf/flink/blob/8e4db423/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git
[3/9] flink git commit: [FLINK-6255] [cep] Remove PatternStream.getSideOutput().
[FLINK-6255] [cep] Remove PatternStream.getSideOutput(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ad87f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ad87f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ad87f4 Branch: refs/heads/master Commit: 05ad87f4ce8c0aea6944feb14bf19795c1fc56c9 Parents: 02ea418 Author: kl0uAuthored: Fri May 12 16:01:38 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:32 2017 +0200 -- docs/dev/libs/cep.md| 41 +--- .../apache/flink/cep/scala/PatternStream.scala | 35 +-- .../org/apache/flink/cep/PatternStream.java | 55 +-- .../AbstractKeyedCEPPatternOperator.java| 24 - .../flink/cep/operator/CEPOperatorUtils.java| 9 +- .../cep/operator/KeyedCEPPatternOperator.java | 4 +- .../TimeoutKeyedCEPPatternOperator.java | 4 +- .../java/org/apache/flink/cep/CEPITCase.java| 98 +--- .../cep/operator/CEPFrom12MigrationTest.java| 6 -- .../cep/operator/CEPMigration11to13Test.java| 2 - .../flink/cep/operator/CEPOperatorTest.java | 2 - .../flink/cep/operator/CEPRescalingTest.java| 1 - 12 files changed, 13 insertions(+), 268 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/docs/dev/libs/cep.md -- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index b379615..58e1a0a 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -806,46 +806,7 @@ in event time. To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last -seen watermark. Late elements are not further processed but they can be redirected to a [side output] -({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them. - -To access the stream of late elements, you first need to specify that you want to get the late data using -`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do -so, the late elements will be silently dropped. Then, you can get the side-output stream using the -`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in -the `.sideOutputLateData(OutputTag)`: - - - -{% highlight java %} -final OutputTag lateOutputTag = new OutputTag("late-data"){}; - -PatternStream patternStream = CEP.pattern(...) -.sideOutputLateData(lateOutputTag); - -// main output with matches -DataStream result = patternStream.select(...) - -// side output containing the late events -DataStream lateStream = patternStream.getSideOutput(lateOutputTag); -{% endhighlight %} - - - -{% highlight scala %} -val lateOutputTag = OutputTag[T]("late-data") - -val patternStream: PatternStream[T] = CEP.pattern(...) -.sideOutputLateData(lateOutputTag) - -// main output with matches -val result = patternStream.select(...) - -// side output containing the late events -val lateStream = patternStream.getSideOutput(lateOutputTag) -{% endhighlight %} - - +seen watermark. Late elements are not further processed. ## Examples http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala -- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index d4bc28c..e71439c 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.cep.pattern.{Pattern => JPattern} import org.apache.flink.streaming.api.scala.{asScalaStream, _} -import org.apache.flink.util.{Collector, OutputTag} +import org.apache.flink.util.Collector import org.apache.flink.types.{Either => FEither} import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} import java.lang.{Long => JLong} -import org.apache.flink.annotation.PublicEvolving import org.apache.flink.cep.operator.CEPOperatorUtils import org.apache.flink.cep.scala.pattern.Pattern @@
[2/9] flink git commit: [FLINK-6371] [cep] NFA return matched patterns as Map<String, List>.
[FLINK-6371] [cep] NFA return matched patterns as Map. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae9c9d06 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae9c9d06 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae9c9d06 Branch: refs/heads/master Commit: ae9c9d061a8b49931c88908b8713cb2efe5f9202 Parents: 9244106 Author: kl0u Authored: Fri May 5 13:55:07 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:31 2017 +0200 -- .../org/apache/flink/cep/CEPLambdaTest.java | 11 +- .../apache/flink/cep/scala/PatternStream.scala | 31 +- ...StreamScalaJavaAPIInteroperabilityTest.scala | 33 +- .../flink/cep/PatternFlatSelectFunction.java| 3 +- .../flink/cep/PatternFlatTimeoutFunction.java | 3 +- .../apache/flink/cep/PatternSelectFunction.java | 3 +- .../org/apache/flink/cep/PatternStream.java | 29 +- .../flink/cep/PatternTimeoutFunction.java | 3 +- .../main/java/org/apache/flink/cep/nfa/NFA.java | 109 +--- .../org/apache/flink/cep/nfa/SharedBuffer.java | 10 +- .../flink/cep/operator/CEPOperatorUtils.java| 19 +- .../cep/operator/KeyedCEPPatternOperator.java | 17 +- .../TimeoutKeyedCEPPatternOperator.java | 23 +- .../java/org/apache/flink/cep/CEPITCase.java| 69 +-- .../org/apache/flink/cep/nfa/NFAITCase.java | 608 +++ .../java/org/apache/flink/cep/nfa/NFATest.java | 62 +- .../apache/flink/cep/nfa/SharedBufferTest.java | 17 +- .../flink/cep/nfa/compiler/NFACompilerTest.java | 1 - .../cep/operator/CEPFrom12MigrationTest.java| 57 +- .../cep/operator/CEPMigration11to13Test.java| 21 +- .../flink/cep/operator/CEPOperatorTest.java | 41 +- .../flink/cep/operator/CEPRescalingTest.java| 31 +- 22 files changed, 474 insertions(+), 727 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ae9c9d06/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java -- diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java index 5957158..03fb3c6 100644 --- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java +++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java @@ -24,10 +24,13 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; +import org.junit.Ignore; import org.junit.Test; +import java.util.List; import java.util.Map; import static org.junit.Assert.*; @@ -41,6 +44,7 @@ public class CEPLambdaTest extends TestLogger { * Tests that a Java8 lambda can be passed as a CEP select function */ @Test + @Ignore public void testLambdaSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -59,16 +63,17 @@ public class CEPLambdaTest extends TestLogger { PatternStream patternStream = new PatternStream<>(inputStream, dummyPattern); DataStream result = patternStream.select( - map -> new EventB() + (Map map) -> new EventB() ); assertEquals(outputTypeInformation, result.getType()); } /** -* Tests that a Java8 labmda can be passed as a CEP flat select function +* Tests that a Java8 lambda can be passed as a CEP flat select function */ @Test + @Ignore public void testLambdaFlatSelectFunction() { TypeInformation eventTypeInformation = TypeExtractor.getForClass(EventA.class); TypeInformation outputTypeInformation = TypeExtractor.getForClass(EventB.class); @@ -86,7 +91,7 @@ public class CEPLambdaTest extends TestLogger { PatternStream patternStream = new PatternStream<>(inputStream, dummyPattern); DataStream result = patternStream.flatSelect( - (map, collector) -> collector.collect(new EventB()) + (Map map, Collector collector) -> collector.collect(new EventB()) ); assertEquals(outputTypeInformation, result.getType());
flink git commit: [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser
Repository: flink Updated Branches: refs/heads/release-1.3 954a100dd -> fe1316b33 [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser This closes #3923. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe1316b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe1316b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe1316b3 Branch: refs/heads/release-1.3 Commit: fe1316b335cb970b7b721359d8a69e12fef700ba Parents: 954a100 Author: twalthrAuthored: Mon May 15 15:27:10 2017 +0200 Committer: twalthr Committed: Wed May 17 14:21:12 2017 +0200 -- docs/dev/table_api.md | 24 +- .../org/apache/flink/table/api/table.scala | 16 +- .../apache/flink/table/codegen/Compiler.scala | 4 +- .../table/expressions/ExpressionParser.scala| 397 --- .../flink/table/plan/ProjectionTranslator.scala | 2 +- .../flink/table/validate/FunctionCatalog.scala | 26 +- .../CastingStringExpressionTest.scala | 6 +- .../table/expressions/DecimalTypeTest.scala | 4 +- .../table/expressions/ScalarFunctionsTest.scala | 6 +- .../table/expressions/TemporalTypesTest.scala | 32 +- .../UserDefinedScalarFunctionTest.scala | 31 +- .../plan/util/RexProgramExtractorTest.scala | 6 +- 12 files changed, 251 insertions(+), 303 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fe1316b3/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index d105188..6a5ceee 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1038,9 +1038,9 @@ This is the EBNF grammar for expressions: expressionList = expression , { "," , expression } ; -expression = alias ; +expression = timeIndicator | overConstant | alias ; -alias = logic | ( logic , "AS" , fieldReference ) ; +alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ; logic = comparison , [ ( "&&" | "||" ) , comparison ] ; @@ -1052,9 +1052,11 @@ product = unary , [ ( "*" | "/" | "%") , unary ] ; unary = [ "!" | "-" ] , composite ; -composite = suffixed | atom ; +composite = over | nullLiteral | suffixed | atom ; -suffixed = interval | cast | as | aggregation | if | functionCall ; +suffixed = interval | cast | as | if | functionCall ; + +interval = timeInterval | rowInterval ; timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ; @@ -1062,17 +1064,15 @@ rowInterval = composite , "." , "rows" ; cast = composite , ".cast(" , dataType , ")" ; -dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ; +dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ; as = composite , ".as(" , fieldReference , ")" ; -aggregation = composite , ( ".sum" | ".sum0" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | ".var_samp" ) , [ "()" ] ; - if = composite , ".?(" , expression , "," , expression , ")" ; functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ; -atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ; +atom = ( "(" , expression , ")" ) | literal | fieldReference ; fieldReference = "*" | identifier ; @@ -1082,10 +1082,16 @@ timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ; +over = composite , "over" , fieldReference ; + +overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ; + +timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ; + {% endhighlight %} Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The -column names and function names follow Java identifier syntax. The column name `rowtime` is a reserved logical attribute in streaming environments. Expressions specified as Strings can also use prefix notation instead of suffix notation to call
flink git commit: [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser
Repository: flink Updated Branches: refs/heads/master a2580171d -> 9244106b3 [FLINK-6587] [table] Simplification and bug fixing of the ExpressionParser This closes #3923. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9244106b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9244106b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9244106b Branch: refs/heads/master Commit: 9244106b334ef54ba3e39a3f2c0c76f46ae4ecd3 Parents: a258017 Author: twalthrAuthored: Mon May 15 15:27:10 2017 +0200 Committer: twalthr Committed: Wed May 17 14:15:07 2017 +0200 -- docs/dev/table_api.md | 24 +- .../org/apache/flink/table/api/table.scala | 16 +- .../apache/flink/table/codegen/Compiler.scala | 4 +- .../table/expressions/ExpressionParser.scala| 397 --- .../flink/table/plan/ProjectionTranslator.scala | 2 +- .../flink/table/validate/FunctionCatalog.scala | 26 +- .../CastingStringExpressionTest.scala | 6 +- .../table/expressions/DecimalTypeTest.scala | 4 +- .../table/expressions/ScalarFunctionsTest.scala | 6 +- .../table/expressions/TemporalTypesTest.scala | 32 +- .../UserDefinedScalarFunctionTest.scala | 31 +- .../plan/util/RexProgramExtractorTest.scala | 6 +- 12 files changed, 251 insertions(+), 303 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9244106b/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index d105188..6a5ceee 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1038,9 +1038,9 @@ This is the EBNF grammar for expressions: expressionList = expression , { "," , expression } ; -expression = alias ; +expression = timeIndicator | overConstant | alias ; -alias = logic | ( logic , "AS" , fieldReference ) ; +alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ; logic = comparison , [ ( "&&" | "||" ) , comparison ] ; @@ -1052,9 +1052,11 @@ product = unary , [ ( "*" | "/" | "%") , unary ] ; unary = [ "!" | "-" ] , composite ; -composite = suffixed | atom ; +composite = over | nullLiteral | suffixed | atom ; -suffixed = interval | cast | as | aggregation | if | functionCall ; +suffixed = interval | cast | as | if | functionCall ; + +interval = timeInterval | rowInterval ; timeInterval = composite , "." , ("year" | "years" | "month" | "months" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ; @@ -1062,17 +1064,15 @@ rowInterval = composite , "." , "rows" ; cast = composite , ".cast(" , dataType , ")" ; -dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "DATE" | "TIME" | "TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" ; +dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ; as = composite , ".as(" , fieldReference , ")" ; -aggregation = composite , ( ".sum" | ".sum0" | ".min" | ".max" | ".count" | ".avg" | ".start" | ".end" | ".stddev_pop" | ".stddev_samp" | ".var_pop" | ".var_samp" ) , [ "()" ] ; - if = composite , ".?(" , expression , "," , expression , ")" ; functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ; -atom = ( "(" , expression , ")" ) | literal | nullLiteral | fieldReference ; +atom = ( "(" , expression , ")" ) | literal | fieldReference ; fieldReference = "*" | identifier ; @@ -1082,10 +1082,16 @@ timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "DAY" | "DAY_TO_HOUR" | timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ; +over = composite , "over" , fieldReference ; + +overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ; + +timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ; + {% endhighlight %} Here, `literal` is a valid Java literal, `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The -column names and function names follow Java identifier syntax. The column name `rowtime` is a reserved logical attribute in streaming environments. Expressions specified as Strings can also use prefix notation instead of suffix notation to call operators and
flink-web git commit: Remove term 'official' from docker blog post and add disclaimer
Repository: flink-web Updated Branches: refs/heads/asf-site f467722b5 -> edbd65a2e Remove term 'official' from docker blog post and add disclaimer Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/edbd65a2 Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/edbd65a2 Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/edbd65a2 Branch: refs/heads/asf-site Commit: edbd65a2e07cca32ad905c0cab5ce498ab0e79e9 Parents: f467722 Author: Robert MetzgerAuthored: Wed May 17 12:18:41 2017 +0200 Committer: Robert Metzger Committed: Wed May 17 12:28:04 2017 +0200 -- _config.yml| 1 - _posts/2017-05-16-official-docker-image.md | 8 +--- content/blog/feed.xml | 8 +--- content/blog/index.html| 6 +++--- content/blog/page2/index.html | 2 +- content/blog/page3/index.html | 2 +- content/blog/page4/index.html | 2 +- content/blog/page5/index.html | 2 +- content/index.html | 4 ++-- content/news/2017/05/16/official-docker-image.html | 10 ++ 10 files changed, 25 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink-web/blob/edbd65a2/_config.yml -- diff --git a/_config.yml b/_config.yml index 30db1b5..b94c71c 100644 --- a/_config.yml +++ b/_config.yml @@ -97,4 +97,3 @@ host: 0.0.0.0 # News Posts paginate: 10 paginate_path: "blog/page:num" - http://git-wip-us.apache.org/repos/asf/flink-web/blob/edbd65a2/_posts/2017-05-16-official-docker-image.md -- diff --git a/_posts/2017-05-16-official-docker-image.md b/_posts/2017-05-16-official-docker-image.md index d452abd..e2c76be 100644 --- a/_posts/2017-05-16-official-docker-image.md +++ b/_posts/2017-05-16-official-docker-image.md @@ -1,17 +1,17 @@ --- layout: post -title: "Introducing Official Docker Images for Apache Flink" +title: "Introducing Docker Images for Apache Flink" date: 2017-05-16 09:00:00 author: "Patrick Lucas (Data Artisans) and Ismaël MejÃa (Talend)" author-twitter: "iemejia" categories: news --- -For some time, the Apache Flink community has provided scripts to build a Docker image to run Flink. Now, starting with version 1.2.1, Flink will have an [official Docker image](https://hub.docker.com/r/_/flink/). This image is maintained by the Flink community and curated by the [Docker](https://github.com/docker-library/official-images) team to ensure it meets the quality standards for container images of the Docker community. +For some time, the Apache Flink community has provided scripts to build a Docker image to run Flink. Now, starting with version 1.2.1, Flink will have a [Docker image](https://hub.docker.com/r/_/flink/) on the Docker Hub. This image is maintained by the Flink community and curated by the [Docker](https://github.com/docker-library/official-images) team to ensure it meets the quality standards for container images of the Docker community. A community-maintained way to run Apache Flink on Docker and other container runtimes and orchestrators is part of the ongoing effort by the Flink community to make Flink a first-class citizen of the container world. -If you want to use the official Docker image today you can get the latest version by running: +If you want to use the Docker image today you can get the latest version by running: docker pull flink @@ -24,3 +24,5 @@ With this image there are various ways to start a Flink cluster, both locally an While this announcement is an important milestone, itâs just the first step to help users run containerized Flink in production. There are [improvements](https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20Docker%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC) to be made in Flink itself and we will continue to improve these Docker images and for the documentation and examples surrounding them. This is of course a team effort, so any contribution is welcome. The [docker-flink](https://github.com/docker-flink) GitHub organization hosts the source files to [generate the images](https://github.com/docker-flink/docker-flink) and the [documentation](https://github.com/docker-flink/docs/tree/master/flink) that is presented alongside the images on Docker Hub. + +*Disclaimer: The docker images are provided as a community project by individuals on a best-effort basis. They are not
flink git commit: [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule
Repository: flink Updated Branches: refs/heads/release-1.3 60873b0c5 -> 954a100dd [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule This closes #3924. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954a100d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954a100d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954a100d Branch: refs/heads/release-1.3 Commit: 954a100dd5fe6ecbb2aac40f33e472d85a5822df Parents: 60873b0 Author: twalthrAuthored: Tue May 16 16:34:54 2017 +0200 Committer: twalthr Committed: Wed May 17 11:54:46 2017 +0200 -- .../DataStreamLogicalWindowAggregateRule.scala | 34 +++- .../scala/stream/sql/WindowAggregateTest.scala | 6 ++-- .../calcite/RelTimeIndicatorConverterTest.scala | 3 +- 3 files changed, 23 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/954a100d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 38de539..050e2cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, Window} @@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo class DataStreamLogicalWindowAggregateRule extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") { - /** Returns a zero literal of the correct time type */ + /** Returns a reference to the time attribute with a time indicator type */ override private[table] def getInAggregateGroupExpression( rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - /** Returns a zero literal of the correct time type */ - override private[table] def getOutAggregateGroupExpression( - rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - private def createZeroLiteral( - rexBuilder: RexBuilder, windowExpression: RexCall): RexNode = { -val timeType = windowExpression.operands.get(0).getType -timeType match { +val timeAttribute = windowExpression.operands.get(0) +timeAttribute match { - case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) => -rexBuilder.makeLiteral(0L, timeType, true) + case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) => +timeAttribute case _ => -throw TableException(s"""Time attribute expected but $timeType encountered.""") +throw TableException( + s"""Time attribute expected but ${timeAttribute.getType} encountered.""") } } + /** Returns a zero literal of a timestamp type */ + override private[table] def getOutAggregateGroupExpression( + rexBuilder: RexBuilder, + windowExpression: RexCall): RexNode = { + +rexBuilder.makeLiteral( + 0L, + rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP), + true) + } + override private[table] def translateWindowExpression( windowExpr: RexCall, rowType: RelDataType): Window = { http://git-wip-us.apache.org/repos/asf/flink/blob/954a100d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 3729ef0..2022db8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++
flink git commit: [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule
Repository: flink Updated Branches: refs/heads/master 7ad489d87 -> a2580171d [FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule This closes #3924. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2580171 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2580171 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2580171 Branch: refs/heads/master Commit: a2580171dd6e9044c0694deea83a2a2f1f9eb1ee Parents: 7ad489d Author: twalthrAuthored: Tue May 16 16:34:54 2017 +0200 Committer: twalthr Committed: Wed May 17 11:53:31 2017 +0200 -- .../DataStreamLogicalWindowAggregateRule.scala | 34 +++- .../scala/stream/sql/WindowAggregateTest.scala | 6 ++-- .../calcite/RelTimeIndicatorConverterTest.scala | 3 +- 3 files changed, 23 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 38de539..050e2cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, Window} @@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo class DataStreamLogicalWindowAggregateRule extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") { - /** Returns a zero literal of the correct time type */ + /** Returns a reference to the time attribute with a time indicator type */ override private[table] def getInAggregateGroupExpression( rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - /** Returns a zero literal of the correct time type */ - override private[table] def getOutAggregateGroupExpression( - rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - private def createZeroLiteral( - rexBuilder: RexBuilder, windowExpression: RexCall): RexNode = { -val timeType = windowExpression.operands.get(0).getType -timeType match { +val timeAttribute = windowExpression.operands.get(0) +timeAttribute match { - case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) => -rexBuilder.makeLiteral(0L, timeType, true) + case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) => +timeAttribute case _ => -throw TableException(s"""Time attribute expected but $timeType encountered.""") +throw TableException( + s"""Time attribute expected but ${timeAttribute.getType} encountered.""") } } + /** Returns a zero literal of a timestamp type */ + override private[table] def getOutAggregateGroupExpression( + rexBuilder: RexBuilder, + windowExpression: RexCall): RexNode = { + +rexBuilder.makeLiteral( + 0L, + rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP), + true) + } + override private[table] def translateWindowExpression( windowExpr: RexCall, rowType: RelDataType): Window = { http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 3729ef0..2022db8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++
[1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results
Repository: flink Updated Branches: refs/heads/master dceb5cc17 -> 7ad489d87 [FLINK-6555] [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed. Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture The WaitingConjunctFuture waits for the completion of its futures. The future values are discarded making it more efficient than the ResultConjunctFuture which returns the futures' values. The WaitingConjunctFuture is instantiated via FutureUtils.waitForAll(Collection). This closes #3873. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c081201f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c081201f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c081201f Branch: refs/heads/master Commit: c081201fd6c3c97a932c09d971f24bf42102650f Parents: dceb5cc Author: Till RohrmannAuthored: Thu May 11 17:36:17 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:18:23 2017 +0200 -- .../flink/runtime/concurrent/FutureUtils.java | 131 +++ .../runtime/executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionJobVertex.java | 4 +- .../executiongraph/failover/FailoverRegion.java | 2 +- .../runtime/concurrent/FutureUtilsTest.java | 83 ++-- 5 files changed, 184 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c081201f/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 4948147..a27af56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.Preconditions; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -106,8 +109,9 @@ public class FutureUtils { /** * Creates a future that is complete once multiple other futures completed. -* The ConjunctFuture fails (completes exceptionally) once one of the Futures in the -* conjunction fails. +* The future fails (completes exceptionally) once one of the futures in the +* conjunction fails. Upon successful completion, the future returns the +* collection of the futures' results. * * The ConjunctFuture gives access to how many Futures in the conjunction have already * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. @@ -115,16 +119,16 @@ public class FutureUtils { * @param futures The futures that make up the conjunction. No null entries are allowed. * @return The ConjunctFuture that completes once all given futures are complete (or one fails). */ - public static ConjunctFuture combineAll(Collection> futures) { + public static ConjunctFuture combineAll(Collection> futures) { checkNotNull(futures, "futures"); - final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + final ResultConjunctFuture conjunct = new ResultConjunctFuture<>(futures.size()); if (futures.isEmpty()) { - conjunct.complete(null); + conjunct.complete(Collections.emptyList()); } else { - for (Future future : futures) { + for (Future future : futures) { future.handle(conjunct.completionHandler); } } @@ -133,16 +137,32 @@ public class FutureUtils { } /** +* Creates a future that is complete once all of the given futures have completed. +* The future fails (completes exceptionally) once one of the given futures +* fails. +* +* The ConjunctFuture gives access to how many Futures have already +* completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. +* +* @param futures The futures to wait on. No null entries are allowed. +* @return The
[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices
http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 27603d0..f9052e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -36,6 +36,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,7 +44,7 @@ import org.junit.Test; /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest { +public class BlobClientSslTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -64,19 +65,14 @@ public class BlobClientSslTest { * Starts the SSL enabled BLOB server. */ @BeforeClass - public static void startSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); + sslClientConfig = new Configuration(); sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -88,20 +84,14 @@ public class BlobClientSslTest { * Starts the SSL disabled BLOB server. */ @BeforeClass - public static void startNonSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startNonSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(BlobServerOptions.SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -114,13 +104,13 @@ public class BlobClientSslTest { * Shuts the BLOB server down. */ @AfterClass - public static void stopServers() { + public static void stopServers() throws IOException { if (BLOB_SSL_SERVER != null) { -
[4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices
[FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices The HighAvailabilityService creates a single BlobStoreService instance which is shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle is exclusively managed by the HighAvailabilityServices. This means that the BlobStore's content is only cleaned up if the HighAvailabilityService's HA data is cleaned up. Having this single point of control, makes it easier to decide when to discard HA data (e.g. in case of a successful job execution) and when to retain the data (e.g. for recovery). Close and cleanup all data of BlobStore in HighAvailabilityServices Use HighAvailabilityServices to create BlobStore Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods This closes #3864. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88b0f2ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88b0f2ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88b0f2ac Branch: refs/heads/master Commit: 88b0f2ac3fd788932d7f434ca57ba3718c3fa621 Parents: ebc1368 Author: Till RohrmannAuthored: Tue May 9 10:26:37 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:18:49 2017 +0200 -- .../org/apache/flink/hdfstests/HDFSTest.java| 14 ++- .../clusterframework/MesosTaskManager.scala | 6 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 26 +- .../handlers/TaskManagerLogHandler.java | 11 ++- .../webmonitor/WebRuntimeMonitorITCase.java | 14 ++- .../handlers/TaskManagerLogHandlerTest.java | 11 ++- .../apache/flink/runtime/blob/BlobCache.java| 80 - .../apache/flink/runtime/blob/BlobServer.java | 38 .../apache/flink/runtime/blob/BlobService.java | 8 +- .../apache/flink/runtime/blob/BlobStore.java| 26 +- .../flink/runtime/blob/BlobStoreService.java| 32 +++ .../apache/flink/runtime/blob/BlobUtils.java| 44 +++-- .../org/apache/flink/runtime/blob/BlobView.java | 49 +++ .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++- .../flink/runtime/blob/VoidBlobStore.java | 8 +- .../apache/flink/runtime/client/JobClient.java | 13 ++- .../runtime/client/JobListeningContext.java | 6 +- .../clusterframework/BootstrapTools.java| 8 +- .../librarycache/BlobLibraryCacheManager.java | 2 +- .../HighAvailabilityServicesUtils.java | 12 ++- .../nonha/AbstractNonHaServices.java| 5 +- .../zookeeper/ZooKeeperHaServices.java | 93 ++-- .../runtime/jobmaster/JobManagerServices.java | 2 +- .../runtime/taskexecutor/TaskExecutor.java | 5 +- .../runtime/webmonitor/WebMonitorUtils.java | 21 +++-- .../flink/runtime/jobmanager/JobManager.scala | 21 ++--- .../runtime/minicluster/FlinkMiniCluster.scala | 8 +- .../minicluster/LocalFlinkMiniCluster.scala | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 36 +--- .../runtime/blob/BlobCacheRetriesTest.java | 79 - .../runtime/blob/BlobCacheSuccessTest.java | 56 ++-- .../flink/runtime/blob/BlobClientSslTest.java | 52 +-- .../flink/runtime/blob/BlobClientTest.java | 16 ++-- .../flink/runtime/blob/BlobRecoveryITCase.java | 21 +++-- .../runtime/blob/BlobServerDeleteTest.java | 21 +++-- .../flink/runtime/blob/BlobServerGetTest.java | 41 +++-- .../flink/runtime/blob/BlobServerPutTest.java | 89 +-- .../flink/runtime/blob/BlobServerRangeTest.java | 10 +-- .../runtime/blob/TestingFailingBlobServer.java | 4 +- .../BlobLibraryCacheManagerTest.java| 33 +++ .../BlobLibraryCacheRecoveryITCase.java | 21 +++-- .../zookeeper/ZooKeeperRegistryTest.java| 7 +- .../jobmanager/JobManagerHARecoveryTest.java| 9 +- .../JobManagerLeaderElectionTest.java | 3 +- .../ZooKeeperLeaderRetrievalTest.java | 13 ++- .../runtime/metrics/TaskManagerMetricsTest.java | 2 +- ...askManagerComponentsStartupShutdownTest.java | 5 +- .../TaskManagerRegistrationTest.java| 5 +- .../src/test/resources/log4j-test.properties| 2 +- .../jobmanager/JobManagerRegistrationTest.scala | 3 +- .../testingUtils/TestingTaskManager.scala | 40 - ...agerHAProcessFailureBatchRecoveryITCase.java | 1 + .../flink/yarn/TestingYarnTaskManager.scala | 25 +++--- .../YarnHighAvailabilityServices.java | 30 ++- .../org/apache/flink/yarn/YarnTaskManager.scala | 6 +- 55 files changed, 667 insertions(+), 545 deletions(-) --
[5/5] flink git commit: [FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion
[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation and deletion This commit introduces a BlobServer#readWriteLock in order to synchronize file creation and deletion operations in BlobServerConnection and BlobServer. This will prevent that multiple put and get operations interfere with each other and with get operations. The get operations are synchronized using the read lock in order to guarantee some kind of parallelism. Add Get and Delete operation tests This closes #3888. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad489d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad489d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad489d8 Branch: refs/heads/master Commit: 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303 Parents: 88b0f2a Author: Till RohrmannAuthored: Wed May 10 17:38:49 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:19:05 2017 +0200 -- .../apache/flink/runtime/blob/BlobClient.java | 2 +- .../apache/flink/runtime/blob/BlobServer.java | 29 ++- .../runtime/blob/BlobServerConnection.java | 240 +++ .../runtime/blob/BlobServerDeleteTest.java | 73 +- .../flink/runtime/blob/BlobServerGetTest.java | 115 - .../flink/runtime/blob/BlobServerPutTest.java | 109 - .../src/test/resources/log4j-test.properties| 2 +- 7 files changed, 509 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 49e54a1..fab3c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -537,7 +537,7 @@ public final class BlobClient implements Closeable { throw new IOException("Server side error: " + cause.getMessage(), cause); } else { - throw new IOException("Unrecognized response"); + throw new IOException("Unrecognized response: " + response + '.'); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 937eab0..5ad4b6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -44,6 +44,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService { /** The maximum number of concurrent connections */ private final int maxConnections; + /** Lock guarding concurrent file accesses */ + private final ReadWriteLock readWriteLock; + /** * Shutdown hook thread to ensure deletion of the storage directory (or null if * the configured high availability mode does not equal{@link HighAvailabilityMode#NONE}) @@ -104,6 +109,7 @@ public class BlobServer extends Thread implements BlobService { public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); + this.readWriteLock = new ReentrantReadWriteLock(); // configure and create the storage directory String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY); @@ -235,6 +241,13 @@ public class BlobServer extends Thread implements BlobService { return blobStore; } + /** +* Returns the lock used to guard file accesses +*/ + public ReadWriteLock getReadWriteLock() { + return readWriteLock; + } + @Override public void
[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore
[FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore In order to store completed checkpoints in an increasing order in ZooKeeper, the paths for the completed checkpoint is no generated by String.format("/%019d", checkpointId) instead of String.format("/%s", checkpointId). This makes sure that the converted long will always have the same length with leading 0s. Fix failing ZooKeeperCompletedCheckpointStoreITCase This closes #3884. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebc13688 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebc13688 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebc13688 Branch: refs/heads/master Commit: ebc13688c9441adf61075707f40b361ee02597f3 Parents: c081201 Author: Till RohrmannAuthored: Fri May 12 14:23:37 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:18:27 2017 +0200 -- .../ZooKeeperCompletedCheckpointStore.java | 6 ++-- ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +--- 2 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ebc13688/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index c8c68bc..95cfb0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi * @param checkpointId to convert to the path * @return Path created from the given checkpoint id */ - protected static String checkpointIdToPath(long checkpointId) { - return String.format("/%s", checkpointId); + public static String checkpointIdToPath(long checkpointId) { + return String.format("/%019d", checkpointId); } /** @@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi * @param path in ZooKeeper * @return Checkpoint id parsed from the path */ - protected static long pathToCheckpointId(String path) { + public static long pathToCheckpointId(String path) { try { String numberString; http://git-wip-us.apache.org/repos/asf/flink/blob/ebc13688/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 73e0ed9..3fd7f1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); + assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(; store.shutdown(JobStatus.FINISHED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); - assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); + assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(; store.recover(); @@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); +
[2/5] flink git commit: [FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore
[FLINK-6284] Correct sorting of completed checkpoints in ZooKeeperStateHandleStore In order to store completed checkpoints in an increasing order in ZooKeeper, the paths for the completed checkpoint is no generated by String.format("/%019d", checkpointId) instead of String.format("/%s", checkpointId). This makes sure that the converted long will always have the same length with leading 0s. Fix failing ZooKeeperCompletedCheckpointStoreITCase This closes #3884. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/827d74e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/827d74e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/827d74e6 Branch: refs/heads/release-1.3 Commit: 827d74e69386cff87576972c1b69a16b92b730ae Parents: 9c6c965 Author: Till RohrmannAuthored: Fri May 12 14:23:37 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:16:54 2017 +0200 -- .../ZooKeeperCompletedCheckpointStore.java | 6 ++-- ...ZooKeeperCompletedCheckpointStoreITCase.java | 34 +--- 2 files changed, 33 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index c8c68bc..95cfb0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -425,8 +425,8 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi * @param checkpointId to convert to the path * @return Path created from the given checkpoint id */ - protected static String checkpointIdToPath(long checkpointId) { - return String.format("/%s", checkpointId); + public static String checkpointIdToPath(long checkpointId) { + return String.format("/%019d", checkpointId); } /** @@ -435,7 +435,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi * @param path in ZooKeeper * @return Checkpoint id parsed from the path */ - protected static long pathToCheckpointId(String path) { + public static long pathToCheckpointId(String path) { try { String numberString; http://git-wip-us.apache.org/repos/asf/flink/blob/827d74e6/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 73e0ed9..3fd7f1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -137,11 +137,11 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); + assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(; store.shutdown(JobStatus.FINISHED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); - assertNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); + assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID(; store.recover(); @@ -161,12 +161,12 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + "/" + checkpoint.getCheckpointID())); +
[4/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices
[FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices The HighAvailabilityService creates a single BlobStoreService instance which is shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle is exclusively managed by the HighAvailabilityServices. This means that the BlobStore's content is only cleaned up if the HighAvailabilityService's HA data is cleaned up. Having this single point of control, makes it easier to decide when to discard HA data (e.g. in case of a successful job execution) and when to retain the data (e.g. for recovery). Close and cleanup all data of BlobStore in HighAvailabilityServices Use HighAvailabilityServices to create BlobStore Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods This closes #3864. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3ea89a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3ea89a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3ea89a9 Branch: refs/heads/release-1.3 Commit: e3ea89a9fab39e7595c466bcd90c30c338c86f4e Parents: 827d74e Author: Till RohrmannAuthored: Tue May 9 10:26:37 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:16:59 2017 +0200 -- .../org/apache/flink/hdfstests/HDFSTest.java| 14 ++- .../clusterframework/MesosTaskManager.scala | 6 +- .../runtime/webmonitor/WebRuntimeMonitor.java | 26 +- .../handlers/TaskManagerLogHandler.java | 11 ++- .../webmonitor/WebRuntimeMonitorITCase.java | 14 ++- .../handlers/TaskManagerLogHandlerTest.java | 11 ++- .../apache/flink/runtime/blob/BlobCache.java| 80 - .../apache/flink/runtime/blob/BlobServer.java | 38 .../apache/flink/runtime/blob/BlobService.java | 8 +- .../apache/flink/runtime/blob/BlobStore.java| 26 +- .../flink/runtime/blob/BlobStoreService.java| 32 +++ .../apache/flink/runtime/blob/BlobUtils.java| 44 +++-- .../org/apache/flink/runtime/blob/BlobView.java | 49 +++ .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++- .../flink/runtime/blob/VoidBlobStore.java | 8 +- .../apache/flink/runtime/client/JobClient.java | 13 ++- .../runtime/client/JobListeningContext.java | 6 +- .../clusterframework/BootstrapTools.java| 8 +- .../librarycache/BlobLibraryCacheManager.java | 2 +- .../HighAvailabilityServicesUtils.java | 12 ++- .../nonha/AbstractNonHaServices.java| 5 +- .../zookeeper/ZooKeeperHaServices.java | 93 ++-- .../runtime/jobmaster/JobManagerServices.java | 2 +- .../runtime/taskexecutor/TaskExecutor.java | 5 +- .../runtime/webmonitor/WebMonitorUtils.java | 21 +++-- .../flink/runtime/jobmanager/JobManager.scala | 21 ++--- .../runtime/minicluster/FlinkMiniCluster.scala | 8 +- .../minicluster/LocalFlinkMiniCluster.scala | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 36 +--- .../runtime/blob/BlobCacheRetriesTest.java | 79 - .../runtime/blob/BlobCacheSuccessTest.java | 56 ++-- .../flink/runtime/blob/BlobClientSslTest.java | 52 +-- .../flink/runtime/blob/BlobClientTest.java | 16 ++-- .../flink/runtime/blob/BlobRecoveryITCase.java | 21 +++-- .../runtime/blob/BlobServerDeleteTest.java | 21 +++-- .../flink/runtime/blob/BlobServerGetTest.java | 41 +++-- .../flink/runtime/blob/BlobServerPutTest.java | 89 +-- .../flink/runtime/blob/BlobServerRangeTest.java | 10 +-- .../runtime/blob/TestingFailingBlobServer.java | 4 +- .../BlobLibraryCacheManagerTest.java| 33 +++ .../BlobLibraryCacheRecoveryITCase.java | 21 +++-- .../zookeeper/ZooKeeperRegistryTest.java| 7 +- .../jobmanager/JobManagerHARecoveryTest.java| 9 +- .../JobManagerLeaderElectionTest.java | 3 +- .../ZooKeeperLeaderRetrievalTest.java | 13 ++- .../runtime/metrics/TaskManagerMetricsTest.java | 2 +- ...askManagerComponentsStartupShutdownTest.java | 5 +- .../TaskManagerRegistrationTest.java| 5 +- .../src/test/resources/log4j-test.properties| 2 +- .../jobmanager/JobManagerRegistrationTest.scala | 3 +- .../testingUtils/TestingTaskManager.scala | 40 - ...agerHAProcessFailureBatchRecoveryITCase.java | 1 + .../flink/yarn/TestingYarnTaskManager.scala | 25 +++--- .../YarnHighAvailabilityServices.java | 30 ++- .../org/apache/flink/yarn/YarnTaskManager.scala | 6 +- 55 files changed, 667 insertions(+), 545 deletions(-) --
[1/5] flink git commit: [FLINK-6555] [futures] Generalize ConjunctFuture to return results
Repository: flink Updated Branches: refs/heads/release-1.3 4104409cc -> 60873b0c5 [FLINK-6555] [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed. Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture The WaitingConjunctFuture waits for the completion of its futures. The future values are discarded making it more efficient than the ResultConjunctFuture which returns the futures' values. The WaitingConjunctFuture is instantiated via FutureUtils.waitForAll(Collection). This closes #3873. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c6c9654 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c6c9654 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c6c9654 Branch: refs/heads/release-1.3 Commit: 9c6c9654e11dd31fa2323977fc02961811d6e518 Parents: 4104409 Author: Till RohrmannAuthored: Thu May 11 17:36:17 2017 +0200 Committer: Till Rohrmann Committed: Wed May 17 08:16:50 2017 +0200 -- .../flink/runtime/concurrent/FutureUtils.java | 131 +++ .../runtime/executiongraph/ExecutionGraph.java | 8 +- .../executiongraph/ExecutionJobVertex.java | 4 +- .../executiongraph/failover/FailoverRegion.java | 2 +- .../runtime/concurrent/FutureUtilsTest.java | 83 ++-- 5 files changed, 184 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9c6c9654/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index 4948147..a27af56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -19,8 +19,11 @@ package org.apache.flink.runtime.concurrent; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.util.Preconditions; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -106,8 +109,9 @@ public class FutureUtils { /** * Creates a future that is complete once multiple other futures completed. -* The ConjunctFuture fails (completes exceptionally) once one of the Futures in the -* conjunction fails. +* The future fails (completes exceptionally) once one of the futures in the +* conjunction fails. Upon successful completion, the future returns the +* collection of the futures' results. * * The ConjunctFuture gives access to how many Futures in the conjunction have already * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. @@ -115,16 +119,16 @@ public class FutureUtils { * @param futures The futures that make up the conjunction. No null entries are allowed. * @return The ConjunctFuture that completes once all given futures are complete (or one fails). */ - public static ConjunctFuture combineAll(Collection> futures) { + public static ConjunctFuture combineAll(Collection> futures) { checkNotNull(futures, "futures"); - final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size()); + final ResultConjunctFuture conjunct = new ResultConjunctFuture<>(futures.size()); if (futures.isEmpty()) { - conjunct.complete(null); + conjunct.complete(Collections.emptyList()); } else { - for (Future future : futures) { + for (Future future : futures) { future.handle(conjunct.completionHandler); } } @@ -133,16 +137,32 @@ public class FutureUtils { } /** +* Creates a future that is complete once all of the given futures have completed. +* The future fails (completes exceptionally) once one of the given futures +* fails. +* +* The ConjunctFuture gives access to how many Futures have already +* completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. +* +* @param futures The futures to wait on. No null entries are allowed. +* @return The
[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle management of HighAvailabilityServices
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 5054107..c106b3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,7 +43,7 @@ import org.junit.Test; /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest { +public class BlobClientSslTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -63,19 +64,14 @@ public class BlobClientSslTest { * Starts the SSL enabled BLOB server. */ @BeforeClass - public static void startSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); + sslClientConfig = new Configuration(); sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -87,20 +83,14 @@ public class BlobClientSslTest { * Starts the SSL disabled BLOB server. */ @BeforeClass - public static void startNonSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startNonSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -113,13 +103,13 @@ public class BlobClientSslTest { * Shuts the BLOB server down. */ @AfterClass - public static void stopServers() { + public static void stopServers() throws IOException { if