[GitHub] flink issue #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x

2018-07-25 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6391
  
@twalthr I've addressed most of your comments. Thanks a lot for the 
detailed review.

However, now it seems like the Elasticsearch 6.x IT cases are failing, due 
to ceased support of Elasticsearch embedded nodes for Java unit tests. I'll 
have to look a bit more to see how that can be fixed. Will ping you when that 
is addressed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037835
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Removed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037795
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

Actually, we decided to move examples out of the test code. Removing this.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205031956
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

You're right, removed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205029480
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz;
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz;
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz;
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz;
--- End diff --

Added a loop to wait until the Elasticsearch node is really running.


---


[GitHub] flink pull request #6413: [FLINK-8993] [tests] Let general purpose DataStrea...

2018-07-25 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6413

[FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer 
via type extraction

## What is the purpose of the change

The general purpose DataStream job previously only uses the 
`KryoSerializer` via a custom state serializer. This PR allows the job to  also 
use the `KryoSerializer` via Flink's type extraction.

## Brief change log

- Adapt the state builders to be able to  supply a state class, instead of 
a state type serializer.
- Let `DataStreamAllroundTestJob` specify state serializers via state 
classes instead of a direct custom serializer.

## Verifying this change

This is a extension to existing end-to-end tests 
(`test-streaming-savepoint.sh`).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8993

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6413


commit 428d5427227343479b6d63daf7fced8f1bf9a69c
Author: Tzu-Li (Gordon) Tai 
Date:   2018-07-25T06:58:46Z

[FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer 
via type extraction




---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204999405
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
--- End diff --

having a local variable here seems a bit redundant, since we always adjust 
it right afterwards.


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204996330
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
--- End diff --

just a minor nit pick here: `adjustmentEndTimeNanos` would be better named 
as `adjustedEndTimeNanos`


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204998677
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
--- End diff --

If this is a method without side effects on the fields of the 
`ShardConsumer`, it might be better off to make this method static, and pass in 
the current `maxNumberOfRecordsPerFetch` as an argument.
This makes it more clear that it is only an utility method to calculate the 
number of records to read.


---


[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...

2018-07-25 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6392
  
LGTM, +1


---


[GitHub] flink pull request #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfi...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6392#discussion_r204684670
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java
 ---
@@ -46,8 +46,6 @@
public CompositeTypeSerializerConfigSnapshot() {}
 
public CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
nestedSerializers) {
-   Preconditions.checkNotNull(nestedSerializers);
--- End diff --

Not sure about this. It should be perfectly fine that we have a null check 
here.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-23 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6391

[FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6.x PR

## What is the purpose of the change

This PR adds the Elasticsearch 6.x PR, as well as an end-to-end test for it.

## Brief change log

- Cherry-picked @cjolif 's Elasticsearch 5.3+ compatibility and 6.x 
implementation changes
- Add end to end test for ES 6.x

## Verifying this change

Run the new end-to-end test for ES 6.x.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9885

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6391.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6391






---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r204342877
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 ---
@@ -197,6 +210,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get()) 
{
keyRanges.add(keyRange);
}
+
+   // let event time start from the max of all event time 
progress across subtasks in the last execution
+   for (Long lastEventTime : lastEventTimes.get()) {
+   monotonousEventTime = 
Math.max(monotonousEventTime, lastEventTime);
--- End diff --

actually, watermarks are not the direct concern here.
What this piece of change is doing is just to ensure that all subtasks 
start from an event time that is guaranteed to have not "jump back" in time.

Watermark extraction is not done within the source.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif no problem, thanks for the notice. I'll try to incorporate the 
changes you mentioned above to the previous work you've already done. Thanks a 
lot!


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif Ideally we have ES 6.x connector merged by the beginning of next 
week. Let me know if this is possible for you. I'll proceed to merge this PR 
first.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif do you think you would be able to quickly open a PR for the REST 
6.x connector that includes the new changes you mentioned, based on this one?


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-07-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
I took another look at the PR, and also talked with @tillrohrmann about 
merging this for 1.6.
I think this LGTM, and with these changes we will at least have an ES 5.x 
connector that is 5.3+ compatible.

Merging ..

After merging this, I'll also try cherry-picking your 6.x REST-based ES 
connector on top. If that works well, will also merge that.


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-17 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6351

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

## What is the purpose of the change

This allows our end-to-end tests to have coverage for snapshotting / 
restoring timers, when configured to use different state backends.

## Brief change log

- Add a tumbling window to the `DataStreamAllAroundTestProgram`
- Change default "max out of orderness" setting of the source generator to 0

## Verifying this change

This is a change that affects existing tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9862

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6351


commit a3bc102303481fa784b9c94d7838b1c2b5f65123
Author: Tzu-Li (Gordon) Tai 
Date:   2018-07-17T10:00:40Z

[FLINK-9862] [test] Extend general puropose DataStream test to have a 
tumbling window

This allows the end-to-end tests to have coverage for testing
checkpointing timers.




---


[GitHub] flink pull request #6273: [FLINK-9377] [core] Implement restore serializer f...

2018-07-06 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6273

[FLINK-9377] [core] Implement restore serializer factory method for simple 
composite serializers

## What is the purpose of the change

This PR is built on top of #6235. It is a WIP PR.

This PR implements the restore serializer factory method for all simple 
composite serializers (i.e., Flink serializers with nested serializers). More 
complex serializers such as the Scala serializers, POJO serializers, 
KryoSerializer, AvroSerializer, etc. will come as a follow-up PR.

## Brief change log

- Introduce the `CompositeTypeSerializer` base class, which wraps the 
configuration snapshotting logic and compatibility checks.
- Let all simple composite type serializers extend the 
`CompositeTypeSerializer`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9377-composite

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6273.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6273


commit 5fc4a36a144c3f8f22be7e21a4e542d3042d10b1
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-13T11:43:53Z

[FLINK-9377] [core] (part 1) Extend TypeSerializerConfigSnapshot as a 
factory for restoring serializers

This commit is the first step towards removing serializers from
checkpointed state meta info and making Flink checkpoints Java
serialization free.

Instead of writing serializers in checkpoints, and trying to read that
to obtain a restore serializer at restore time, we aim to only write the
config snapshot as the single source of truth and use it as a factory to
create a restore serializer.

This commit adds the method and signatures to the
TypeSerializerConfigSnapshot interface. Use of the method, as well as
properly implementing the method for all serializers, will be
implemented in follow-up commits.

commit 661eb6d34da450ed096a77f166a4cc62ce3efdba
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-14T09:52:06Z

[FLINK-9377] [core] (part 2) Remove fallback deserializer option from 
CompatibilityResult

Now that the config snapshot is used as a factory for the restore
serializer, it should be guaranteed that a restore serializer is always
available. This removes the need for the user to provide a "fallback"
convert serializer in the case where a migration is required.

commit c91d045c5eb6e355981e4edaa6d1a0d48e5d4a5e
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-14T14:41:45Z

[FLINK-9377] [core] (part 3) Deprecate TypeSerializerSerializationUtil

This commit deprecates all utility methods and classes related to
serializing serializers. All methods that will still be in use, i.e.
writing config snapshots, are now moved to a separate new
TypeSerializerConfigSnapshotSerializationUtil class.

commit e09f91469fb6c86f5d2f05b78a9db3d9af8cce87
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-18T14:24:08Z

[FLINK-9377] [core] (part 4) Introduce BackwardsCompatibleConfigSnapshot

The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config
snapshot which wraps an actual config snapshot, as well as a
pre-existing serializer instance.

In previous versions, since the config snapshot wasn't a serializer
factory but simply a container for serializer parameters, previous
serializers didn't necessarily have config snapshots that are capable of
correctly creating a correct corresponding restore serializer.

In this case, since previous serializers still have serializers written
in the checkpoint, the backwards compatible solution would be to wrap
the written serializer and the config snapshot within the
BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the
serializer, the wrapped serializer instance is returned instead of
actually calling the restore

[GitHub] flink issue #6206: [FLINK-9654] [core] Changed the check for anonymous class...

2018-07-02 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6206
  
Thanks for looking into this @zsolt-donca.
The changes looks good to me, but as @yanghua mentioned, a test to cover 
the previous bug would be nice here and allow the reviewer to understand 
directly that it is a required fix.


---


[GitHub] flink pull request #6221: [FLINK-9686] [kinesis] Enable Kinesis authenticati...

2018-07-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6221#discussion_r199420811
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
 ---
@@ -45,29 +45,63 @@
/** Simply create AWS credentials by supplying the AWS access 
key ID and AWS secret key in the configuration properties. */
BASIC,
 
+   /** Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied. **/
+   ASSUME_ROLE,
+
/** A credentials provider chain will be used that searches for 
credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance 
metadata. **/
AUTO,
}
 
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is 
used if not set). */
public static final String AWS_REGION = "aws.region";
 
+   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set). */
+   public static final String AWS_CREDENTIALS_PROVIDER = 
"aws.credentials.provider";
+
/** The AWS access key ID to use when setting credentials provider type 
to BASIC. */
-   public static final String AWS_ACCESS_KEY_ID = 
"aws.credentials.provider.basic.accesskeyid";
+   public static final String AWS_ACCESS_KEY_ID = 
accessKeyId(AWS_CREDENTIALS_PROVIDER);
 
/** The AWS secret key to use when setting credentials provider type to 
BASIC. */
-   public static final String AWS_SECRET_ACCESS_KEY = 
"aws.credentials.provider.basic.secretkey";
-
-   /** The credential provider type to use when AWS credentials are 
required (BASIC is used if not set). */
-   public static final String AWS_CREDENTIALS_PROVIDER = 
"aws.credentials.provider";
+   public static final String AWS_SECRET_ACCESS_KEY = 
secretKey(AWS_CREDENTIALS_PROVIDER);
 
/** Optional configuration for profile path if credential provider type 
is set to be PROFILE. */
-   public static final String AWS_PROFILE_PATH = 
"aws.credentials.provider.profile.path";
+   public static final String AWS_PROFILE_PATH = 
profilePath(AWS_CREDENTIALS_PROVIDER);
 
/** Optional configuration for profile name if credential provider type 
is set to be PROFILE. */
-   public static final String AWS_PROFILE_NAME = 
"aws.credentials.provider.profile.name";
+   public static final String AWS_PROFILE_NAME = 
profileName(AWS_CREDENTIALS_PROVIDER);
 
/** The AWS endpoint for Kinesis (derived from the AWS region setting 
if not set). */
public static final String AWS_ENDPOINT = "aws.endpoint";
 
+   public static String accessKeyId(String prefix) {
+   return prefix + ".basic.accesskeyid";
+   }
+
+   public static String secretKey(String prefix) {
+   return prefix + ".basic.secretkey";
+   }
+
+   public static String profilePath(String prefix) {
+   return prefix + ".profile.path";
+   }
+
+   public static String profileName(String prefix) {
+   return prefix + ".profile.name";
+   }
+
+   public static String roleArn(String prefix) {
--- End diff --

Is there a reason to change the way key constants are defined in this class?
i.e., if the previous pattern was followed, users could just use 
`AwsConfigConstants.AWS_ROLE_ARN` to set a value for the role ARN, and likewise 
for the other new configurations.


---


[GitHub] flink issue #6209: [FLINK-9655][tests] Add missing parallelism parameters

2018-07-02 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6209
  
Thanks @zentol for fixing this, merging ..


---


[GitHub] flink pull request #6235: [FLINK-9377] [core] Remove serializers from checkp...

2018-07-02 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6235

[FLINK-9377] [core] Remove serializers from checkpointed state meta infos

## What is the purpose of the change

This PR is the first step towards a smoother state evolution experience.
It removes the behavior of writing serializers in checkpointed state meta 
infos (using Java serialization) and relying on them to be deserializable at 
restore time.
Instead, the configuration snapshots of serializers now double as a factory 
for creating the restore serializer, solidifying it as the single source of 
truth of information about the previous serializer of state.

With this change:
- Checkpoints / savepoints move towards being Java serialization-free
- The availability of the restore serializer, is basically determined at 
compile time
- Potentially resolves caveats with macro-generated Scala serializers which 
typically have anonymous classnames which are easily susceptible to changes, 
which blocks successful savepoint restores due to how Java serialization works.
- In conclusion: the written configuration snapshot is now the single point 
of entry for obtaining a serializer for previous state. The user is only 
required to guarantee that the configuration snapshot's classname remains 
constant for the restore to proceed.

This PR is only a WIP which only includes extending the 
`TypeSerializerConfigSnapshot` interface to include a `restoreSerializer` 
method, as well as the methods interplay in the state backends after removing 
serializers from checkpointed state meta infos.

This PR does **NOT** include:
- Proper implementation of the new `restoreSerializer` method on all 
serializers.
- Tests for snapshotting, restoring, and migrating serializers and their 
interplay in the state backends.

Because of this, it is expected that existing tests will fail.
Follow-up PRs will be opened for the above mentioned missing parts.

## Brief change log

- 5fc4a36 Add a `restoreSerializer` method to the 
`TypeSerializerConfigSnapshot` interface

The method still has a dummy base implementation, because this PR doesn't 
yet properly implement the method for all serializers. Once that is 
accomplished, the base implementation should be removed.

- 661eb6d Remove the "fallback" serializer option from `CompatibilityResult`

That option was available in the past to allow users to have a safety path 
for state conversion, in case their previous serializer cannot be deserialized 
due to any reason blocked by Java serialization. Since now we use the config 
snapshot as the restore serializer factory, it is guaranteed that the restore 
serializer is always available in case conversion is required, and therefore 
voids the need for the "fallback" serializer option.

- c91d045 Deprecates any utility methods that still have the behaviour of 
writing serializers in checkpoints

- e09f914 Introduces the `BackwardsCompatibleConfigSnapshot` class

The BackwardsCompatibleConfigSnapshot is a wrapper, dummy config
snapshot which wraps an actual config snapshot, as well as a
pre-existing serializer instance.

In previous versions, since the config snapshot wasn't a serializer
factory but simply a container for serializer parameters, previous
serializers didn't necessarily have config snapshots that are capable of
correctly creating a correct corresponding restore serializer.

In this case, since previous serializers still have serializers written
in the checkpoint, the backwards compatible solution would be to wrap
the written serializer and the config snapshot within the
BackwardsCompatibleConfigSnapshot dummy. When attempting to restore the
serializer, the wrapped serializer instance is returned instead of
actually calling the restoreSerializer method of the wrapped config
snapshot.

- da84665 the actual removal of serializers from checkpointed state meta 
info

## Verifying this change

This PR is a WIP preview, and tests is expected to fail due to reasons 
mentioned in the description.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce

[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...

2018-06-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6177
  
@tweise I've manually merged this, but forgot to add the "This closes 
#" message.
Could you close this PR manually? Thanks!


---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-06-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
@fmthoma I've merged this manually. Thanks for the contribution.
Could you close this PR?


---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
Thanks @fmthoma, will proceed to merge this ..


---


[GitHub] flink issue #6197: [FLINK-9638][E2E Tests] Add helper script to run single t...

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6197
  
Tried this locally, and it works.
+1, will merge this ..


---


[GitHub] flink issue #6172: [FLINK-9594][E2E tests] Improve docs for new test runner

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6172
  
Thanks @florianschmidt1994, merging this ..


---


[GitHub] flink issue #6193: [hotfix] [docs] Fix typo in index.md

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6193
  
LGTM, thanks for catching this @MichealShin, will merge this.


---


[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6182
  
Merging this ..


---


[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6177
  
@tweise ok, I don't really have a strong opinion on not including the other 
changes.

Changes LGTM then, will merge this.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197071117
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

Would like to request one more slight change here:
Let this method return a boolean that indicates whether or not flushing 
occurred.

The caller of this method can then use the flag to decide whether or not 
`checkAndPropagateAsyncError` is required.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197067733
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
+   int attempt = 0;
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   backpressureCycles.inc();
+   if (attempt >= 10) {
+   LOG.warn("Waiting for the queue length to drop 
below the limit takes unusually long, still not done after {} attempts.", 
attempt);
+   }
+   attempt++;
+   try {
+   backpressureLatch.await(100);
--- End diff --

We might want to make the wait time configurable? (as a separate PR)
My reasoning is that it directly affects how long until the "flush taking 
unusually long" message starts popping up.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197065346
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -55,6 +58,13 @@
 @PublicEvolving
 public class FlinkKinesisProducer extends RichSinkFunction 
implements CheckpointedFunction {
 
+   public static final String KINESIS_PRODUCER_METRIC_GROUP = 
"kinesisProducer";
+
+   public static final String METRIC_BACKPRESSURE_CYCLES = 
"backpressureCycles";
+
+   public static final String METRIC_OUTSTANDING_RECORDS_COUNT = 
"outstandingRecordsCount";
+
+
--- End diff --

nit: unnecessary line


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197067136
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
+   int attempt = 0;
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   backpressureCycles.inc();
+   if (attempt >= 10) {
+   LOG.warn("Waiting for the queue length to drop 
below the limit takes unusually long, still not done after {} attempts.", 
attempt);
+   }
+   attempt++;
+   try {
+   backpressureLatch.await(100);
--- End diff --

I like this implementation a lot better now 👍 


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197065961
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) {
this.failOnError = failOnError;
}
 
+   /**
+* The {@link KinesisProducer} holds an unbounded queue internally. To 
avoid memory
+* problems under high loads, a limit can be employed above which the 
internal queue
+* will be flushed, thereby applying backpressure.
+*
+* @param queueLimit The maximum length of the internal queue before 
backpressuring
+*/
+   public void setQueueLimit(int queueLimit) {
+   this.queueLimit = queueLimit;
--- End diff --

Will need argument checks on the given `queueLimit`.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197070282
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
@@ -267,6 +268,79 @@ public void go() throws Exception {
testHarness.close();
}
 
+   /**
+* Test ensuring that the producer blocks if the queue limit is 
exceeded,
+* until the queue length drops below the limit;
+* we set a timeout because the test will not finish if the logic is 
broken.
+*/
+   @Test(timeout = 1)
+   public void testBackpressure() throws Throwable {
+   final DummyFlinkKinesisProducer producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
+   producer.setQueueLimit(1);
+
+   OneInputStreamOperatorTestHarness testHarness =
+   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
+
+   testHarness.open();
+
+   UserRecordResult result = mock(UserRecordResult.class);
+   when(result.isSuccessful()).thenReturn(true);
+
+   CheckedThread msg1 = new CheckedThread() {
+   @Override
+   public void go() throws Exception {
+   testHarness.processElement(new 
StreamRecord<>("msg-1"));
+   }
+   };
+   msg1.start();
+   msg1.trySync(100);
+   assertFalse("Flush triggered before reaching queue limit", 
msg1.isAlive());
--- End diff --

I wonder if this would introduce flakiness in the test.
@fmthoma could you elaborate a bit here?


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197068370
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+public class TimeoutLatch {
--- End diff --

This needs to be annotated as `@Internal`


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197069244
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+public class TimeoutLatch {
+
+   private final Object lock = new Object();
+   private volatile boolean waiting;
+
+   public void await(long timeout) throws InterruptedException {
+   synchronized (lock) {
+   waiting = true;
+   lock.wait(timeout);
+   }
+   }
+
+   public void trigger() {
+   if (waiting) {
+   synchronized (lock) {
+   waiting = false;
--- End diff --

I agree with @fmthoma here.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197064931
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

The moving average calculation, that you described, could maybe just be a 
implementation of the limit supplier function.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197063764
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

A user-provided queue limit supplier function sounds like a good idea.
As you mentioned, this can come as a follow-up PR.


---


[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6

2018-06-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6182
  
Thanks for the fix @dawidwys!
The changes LGTM on my side, +1.


---


[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...

2018-06-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6172#discussion_r195992732
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -33,6 +33,53 @@ $ FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_batch_wordcount
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, 
try
-to put new functionality in common.sh so that it can be reused by other 
tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever 
possible, try
+to put new functionality in `common.sh` so that it can be reused by other 
tests.
+
+### Adding a test case
+In order to add a new test case you need a new line to either 
`test-scripts/run-nightly-tests.sh` and / or 
`test-scripts/run-pre-commit-tests.sh`
+
+Adding a new test case generally follows the following pattern
+
+```sh
+run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh 
arg1 arg2"
+```
+
+_Note: If you want to parameterize your tests please do so by adding 
multiple test cases with parameters as arguments to the nightly / pre-commit 
test suites. This allows the test runner to do a cleanup in between each 
individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / 
stderr by your flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+### Failing your test
+A test is considered to have failed if it:
+- exited with non-zero exit code
+- has non-empty *.out files (something was written to stdout / stderr by 
your flink program)
+- there are errors in the log files
+- there are exceptions in the log files
+
+_There is a whitelist for exceptions and errors that do not lead to 
failure, they can be found in the `check_logs_for_errors` and 
`check_logs_for_exceptions` in `test-scripts/common.sh`_
+
+Please note that a previously supported pattern where you could assign a 
value the global variable `PASS` to have your tests fail **is not supported 
anymore**
--- End diff --

Missing period at the end.


---


[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...

2018-06-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6172#discussion_r195992679
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -33,6 +33,53 @@ $ FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_batch_wordcount
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, 
try
-to put new functionality in common.sh so that it can be reused by other 
tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever 
possible, try
+to put new functionality in `common.sh` so that it can be reused by other 
tests.
+
+### Adding a test case
+In order to add a new test case you need a new line to either 
`test-scripts/run-nightly-tests.sh` and / or 
`test-scripts/run-pre-commit-tests.sh`
+
+Adding a new test case generally follows the following pattern
+
+```sh
+run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh 
arg1 arg2"
+```
+
+_Note: If you want to parameterize your tests please do so by adding 
multiple test cases with parameters as arguments to the nightly / pre-commit 
test suites. This allows the test runner to do a cleanup in between each 
individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / 
stderr by your flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+### Failing your test
+A test is considered to have failed if it:
+- exited with non-zero exit code
+- has non-empty *.out files (something was written to stdout / stderr by 
your flink program)
+- there are errors in the log files
+- there are exceptions in the log files
+
+_There is a whitelist for exceptions and errors that do not lead to 
failure, they can be found in the `check_logs_for_errors` and 
`check_logs_for_exceptions` in `test-scripts/common.sh`_
--- End diff --

`... not lead to failure, they can be found ...` --> `.. not lead to 
failure, "which" can be found`


---


[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...

2018-06-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6172#discussion_r195992920
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -33,6 +33,53 @@ $ FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_batch_wordcount
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, 
try
-to put new functionality in common.sh so that it can be reused by other 
tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever 
possible, try
+to put new functionality in `common.sh` so that it can be reused by other 
tests.
+
+### Adding a test case
+In order to add a new test case you need a new line to either 
`test-scripts/run-nightly-tests.sh` and / or 
`test-scripts/run-pre-commit-tests.sh`
+
+Adding a new test case generally follows the following pattern
+
+```sh
+run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh 
arg1 arg2"
+```
+
+_Note: If you want to parameterize your tests please do so by adding 
multiple test cases with parameters as arguments to the nightly / pre-commit 
test suites. This allows the test runner to do a cleanup in between each 
individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / 
stderr by your flink program)
+- there are no exceptions in the log files
+- there are no errors in the log files
+
+### Failing your test
+A test is considered to have failed if it:
+- exited with non-zero exit code
+- has non-empty *.out files (something was written to stdout / stderr by 
your flink program)
+- there are errors in the log files
+- there are exceptions in the log files
+
+_There is a whitelist for exceptions and errors that do not lead to 
failure, they can be found in the `check_logs_for_errors` and 
`check_logs_for_exceptions` in `test-scripts/common.sh`_
+
+Please note that a previously supported pattern where you could assign a 
value the global variable `PASS` to have your tests fail **is not supported 
anymore**
+
+### Cleanup
+The test runner performs a cleanup after each test case, which includes:
+- Stopping the cluster
+- Killing all task and job managers
+- Reverting config to default (if changed before)
+- Cleaning up log and temp directories
+
+If your test case should do some *additional* cleanup, a common pattern is 
to trap a `test_cleanup` function to `EXIT` in your test case like so:
--- End diff --

I would explicitly state that this typically consists of shutting down 
external systems used by the e2e test, e.g. Kafka / Elasticsearch.


---


[GitHub] flink pull request #6172: [FLINK-9594][E2E tests] Improve docs for new test ...

2018-06-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6172#discussion_r195991753
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -33,6 +33,53 @@ $ FLINK_DIR= 
flink-end-to-end-tests/test-scripts/test_batch_wordcount
 
 ## Writing Tests
 
-Have a look at test_batch_wordcount.sh for a very basic test and
-test_streaming_kafka010.sh for a more involved example. Whenever possible, 
try
-to put new functionality in common.sh so that it can be reused by other 
tests.
+### Examples
+Have a look at `test_batch_wordcount.sh` for a very basic test and
+`test_streaming_kafka010.sh` for a more involved example. Whenever 
possible, try
+to put new functionality in `common.sh` so that it can be reused by other 
tests.
+
+### Adding a test case
+In order to add a new test case you need a new line to either 
`test-scripts/run-nightly-tests.sh` and / or 
`test-scripts/run-pre-commit-tests.sh`
+
+Adding a new test case generally follows the following pattern
+
+```sh
+run_test "simple end-to-end test" "$END_TO_END_DIR/test-scripts/simple.sh 
arg1 arg2"
+```
+
+_Note: If you want to parameterize your tests please do so by adding 
multiple test cases with parameters as arguments to the nightly / pre-commit 
test suites. This allows the test runner to do a cleanup in between each 
individual test and also to fail those tests individually._
+
+### Passing your test
+A test is considered to have passed if it:
+- has exit code 0
+- there are no non-empty .out files (nothing was written to stdout / 
stderr by your flink program)
--- End diff --

Capital `f` for `Flink`


---


[GitHub] flink issue #6177: Backport of Kinesis connector changes from 1.5 to release...

2018-06-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6177
  
I'm not sure that we want to backport the shard-mapping hash function. IMO, 
that is a new feature and shouldn't go into a bugfix release.

What do you think @tweise?


---


[GitHub] flink issue #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass" messag...

2018-06-14 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6053
  
```

==
Running 'Streaming Python Wordcount end-to-end test'

==
Flink dist directory: /home/travis/build/apache/flink/build-target
TEST_DATA_DIR: 
/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-10412258255
Starting cluster.
Starting standalonesession daemon on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Starting taskexecutor daemon on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Dispatcher REST endpoint is up.
Starting execution of program
Program execution finished
Job with JobID 06184a085272dd12b3573b1bcb96badc has finished.
Job Runtime: 6103 ms
pass StreamingPythonWordCount
Stopping taskexecutor daemon (pid: 31303) on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Stopping standalonesession daemon (pid: 30988) on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.

[PASS] 'Streaming Python Wordcount end-to-end test' passed after 0 minutes 
and 24 seconds! Test exited with exit code 0.



==
Running 'Wordcount end-to-end test'

==
Flink dist directory: /home/travis/build/apache/flink/build-target
TEST_DATA_DIR: 
/home/travis/build/apache/flink/flink-end-to-end-tests/test-scripts/temp-test-directory-36174383269
Starting cluster.
Starting standalonesession daemon on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Starting taskexecutor daemon on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Waiting for dispatcher REST endpoint to come up...
Dispatcher REST endpoint is up.
Starting execution of program
Program execution finished
Job with JobID 30256ad7ff23ea8543ddca76bacaaee5 has finished.
Job Runtime: 1352 ms
pass WordCount
Stopping taskexecutor daemon (pid: 835) on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.
Stopping standalonesession daemon (pid: 517) on host 
travis-job-363a754a-fe5f-4873-bbe8-fe4064b95bc8.

[PASS] 'Wordcount end-to-end test' passed after 0 minutes and 11 seconds! 
Test exited with exit code 0.
```

Travis logs excerpt looks good. Follow up commits looks good.
+1, LGTM on my side.


---


[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...

2018-06-12 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6151

[FLINK-9569] [avro] Fix confusing construction of GenericRecord 
AvroSerializers

## What is the purpose of the change

The `AvroSerializer` previously had a `AvroSerializer(Class type, Schema 
schema)` public constructor when used for generic records.

This is a bit confusing, because when using the `AvroSerializer`, the type 
to be serialized should always be a `GenericData.Record` type.

This PR fixes this by letting the `AvroSerializer` having similar 
construction patterns to the `AvroDeserializationSchema`, where we have static 
factory methods for generic and non-generic Avro types.

## Brief change log

- Remove all public constructors from the `AvroSerializer`
- Add `TypeSerializer AvroSerializer.forGeneric(Schema)` 
method to create an `AvroSerializer` for `GenericRecord`s
- Add `TypeSerializer AvroSerializer.forNonGeneric(Class type)` 
method to create an `AvroSerializer` for specific or POJO types.
- A previously deprecated constructor in the `AvroSerializer` is completely 
removed, since it should be assumed that the `AvroSerializer` is an internal 
API and would not require API deprecation.

## Verifying this change

This is just a code refactor. Required test coverage should not have been 
effected.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (**yes** / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9569

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6151.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6151


commit 53da570a962226ff81172a209dc1362a0637e110
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-11T16:04:51Z

wip

commit 1abbcd13da9f2ac69afc318e121c04d59ee903ec
Author: Tzu-Li (Gordon) Tai 
Date:   2018-06-12T08:43:19Z

[FLINK-9569] [avro] Fix confusing construction for GenericRecord 
AvroSerializers




---


[GitHub] flink issue #5845: [FLINK-9168][flink-connectors]Pulsar Sink connector

2018-06-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5845
  
Sorry for delays on my reply here. I'll take a look at this week, over the 
next days.
If there is going to be a new PR, please also let me know. Thanks!


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-30 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif do you think it would be possible that with a clean cut using a 
REST implementation, we no longer need to have separate modules anymore for ES 
6.x, 7.x, 8.x or so on?
i.e., it would only be a matter for the user of recompiling that REST-based 
implementation with a different ES client version.

If no, then I would still prefer that we continue with the current approach 
this PR is proposing, since we need this fix in to have Elasticsearch 5.3+ 
working anyways.


---


[GitHub] flink issue #6038: [FLINK-9394] [e2e] Test rescaling when resuming from exte...

2018-05-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6038
  
@StefanRRichter thanks for the review, will merge this.


---


[GitHub] flink issue #6072: [hotfix][doc] Clarify kinesis docs and fix debugging clas...

2018-05-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6072
  
Will only merge the Kinesis doc fix, as the classloading docs already has a 
substantial fix meanwhile in master.


---


[GitHub] flink pull request #6074: [FLINK-9429] [test] Fix does not properly terminat...

2018-05-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6074#discussion_r190786548
  
--- Diff: flink-end-to-end-tests/test-scripts/elasticsearch-common.sh ---
@@ -61,8 +61,9 @@ function verify_result {
 rm $TEST_DATA_DIR/output
 fi
 
-while : ; do
-  curl 'localhost:9200/index/_search?q=*=21' > 
$TEST_DATA_DIR/output
+# make sure can terminate properly with control-C.
+while [ $? -ne 130 ]; do
--- End diff --

I would prefer that we just wrap the Elasticsearch querying to another 
function, that improves readability.

Something along the lines of:
```
function fetch_elasticsearch {
curl 'localhost:9200/index3/_count?q=*' > $TEST_DATA_DIR/output
echo $(grep '\"count\"' $TEST_DATA_DIR/output | awk '{print $3}' | sed 
's/\(.*\),/\1 /')
}

function verify_result {
local numRecords=$1

if [ -f "$TEST_DATA_DIR/output" ]; then
rm $TEST_DATA_DIR/output
fi

while (( $(fetch_elasticsearch) < $numRecords )) ; do
  echo "Waiting for Elasticsearch records ..."
  sleep 1
done
}
```


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
One more thing to clarify:
When planning to switch to REST, are we speaking of an implementation that 
works directly against Elasticsearch's REST API? Or are we thinking of using 
Elasticsearch's 
[RestHighLevelClient](https://snapshots.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-high-level-client/7.0.0-alpha1-SNAPSHOT/org/elasticsearch/client/RestHighLevelClient.html)?

I would assume the latter, but IMO we would not be able to avoid yet again 
having a common base module across future versions (e.g. across ES 6.x, 7.x, 
and so on), even if we make a clean cut now.
So, I have the feeling that the main problem here isn't that we are sharing 
code between versions, but the fact that our base shared code isn't 
future-proof enough for potential 3rd party API breaks.

That's the main reason why I'm proposing not to expose Elasticsearch 
classes anymore through base class APIs in the shared module.


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6043
  
The main reason why the discussion leaned towards the current proposed 
change by this PR, was that only Elasticsearch 5.6+ supports REST.

Only working towards a clean-cut module that uses REST, would mean that we 
still wouldn't be able to support Elasticsearch 5.2+ up to Elasticsearch 5.5.


---


[GitHub] flink issue #6072: [hotfix][doc] Clarify kinesis docs and fix debugging clas...

2018-05-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6072
  
+1, thanks for the fixes @pnowojski.
Will merge this ..


---


[GitHub] flink issue #6061: [FLINK-9425] Make release scripts compliant with ASF rele...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6061
  
+1, thanks for the compliance updates. LGTM.


---


[GitHub] flink issue #6058: [FLINK-9415] Remove reference to StreamingMultipleProgram...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6058
  
+1, LGTM
Merging this ..


---


[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
Using `CheckedThread` is more preferable, as it simplifies some of the test 
code.
But yes, the utility was introduced at a later point in time in Flink, so 
some parts of the test code might still be using `Thread`s and 
`AtomicReference`s.


---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
@fmthoma yes, that would be great.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190153528
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue<KafkaTopicPartitionState> 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

Would it actually make more sense that we have a queue for this? Like how 
we are handling unassigned new partitions via the `unassignedPartitionsQueue`. 
The fact that this is a set means that we will need to eventually remove 
entries from it anyways.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190150419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

Why do we want this to be configurable? Is there any case that we would 
prefer to leave them untouched?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190152570
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List<KafkaTopicPartitionState> 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List<KafkaTopicPartitionState> 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I have the feeling that this method is way too complex now, to a point that 
it might make more sense to break this up into 2 different methods - 
`addPartitionsToAssignment` and `removePartitionsFromAssignment`.


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190146869
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,196 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
+JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
+JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function extract_valid_pact_from_job_info_return() {
+PACT_MATCH=0
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+echo ${PACT_MATCH}
+}
+
+function extract_valid_job_list_by_type_from_job_list_return() {
+JOB_LIST_MATCH=0
+JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3"
+if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]];
+then
+JOB_LIST_MATCH=$JOB_LIST_MATCH
+else
+JOB_LIST_MATCH=-1
+fi
+echo ${JOB_LIST_MATCH}
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
--- End diff --

I don't think we need to explicitly shutdown the cluster and TMs here; that 
is already part of the `cleanup` call


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190145819
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Well, we can have a completely normal exit code from the `run` execution, 
but the `-p` option completely ignored if we change the CLI to simply not 
recognize the option.

This is an extreme case, though.


---


[GitHub] flink pull request #6023: [FLINK-9383][runtime] Test directories in Distribu...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6023#discussion_r190144815
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -93,6 +93,14 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n==\n"
+printf "Running Distributed cache end-to-end test\n"
+printf 
"==\n"
+
$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh
--- End diff --

Should update this to use the new `run_test` utility we have.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140597
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
--- End diff --

Should we move these to be scoped only to the `testGetShardList ` test 
method?


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140724
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
+
+   protected static HashMap<String, String>
+   createInitialSubscribedStreamsToLastDiscoveredShardsState(List 
streams) {
+   HashMap<String, String> initial = new HashMap<>();
+   for (String stream : streams) {
+   initial.put(stream, null);
+   }
+   return initial;
+   }
+
+   private static ListShardsRequestMatcher 
initialListShardsRequestMatcher() {
+   return new ListShardsRequestMatcher(null, null);
+   }
+
+   private static ListShardsRequestMatcher listShardsNextToken(final 
String nextToken) {
--- End diff --

nit: IMO, it would help with readability if we move these private utility 
methods after the main test ones.


---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
@fmthoma I think this might benefit from an actual documentation, not only 
Javadocs.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190127059
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
--- End diff --

I think we can leave `RequestIndexer` as a interface, and make the 
`add(ActionRequest...)` a [default 
method](https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html).

This would lessen the friction of this breaking change.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190127406
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
 */
-   void add(ActionRequest... actionRequests);
+   @Deprecated
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (actionRequest instanceof IndexRequest) {
+   add((IndexRequest) actionRequest);
+   } else if (actionRequest instanceof DeleteRequest) {
+   add((DeleteRequest) actionRequest);
+   } else if (actionRequest instanceof UpdateRequest) {
+   add((UpdateRequest) actionRequest);
+   } else {
+   throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+   }
+   }
+   }
+
+   /**
+* Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+*
+* @param deleteRequests The multiple {@link DeleteRequest} to add.
+*/
+   public abstract void add(DeleteRequest... deleteRequests);
--- End diff --

What would be your feeling on not exposing `DeleteRequest`, `IndexRequest`, 
`UpdateRequest` directly through user API?

We could maintain our own way to specify requests, and only create the 
actual ES request instances internally.
It would be more maintenance work for us, but might be safer from a 
future-proof API perspective.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190126862
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 ---
@@ -45,12 +48,34 @@
}
 
@Override
-   public void add(ActionRequest... actionRequests) {
-   for (ActionRequest actionRequest : actionRequests) {
+   public void add(DeleteRequest... deleteRequests) {
+   for (DeleteRequest deleteRequest : deleteRequests) {
if (flushOnCheckpoint) {
numPendingRequestsRef.getAndIncrement();
}
-   this.bulkProcessor.add(actionRequest);
+   this.bulkProcessor.add(deleteRequest);
+   }
+   }
+
+   @Override
+   public void add(IndexRequest... indexRequests) {
+   for (IndexRequest indexRequest : indexRequests) {
+   System.out.println("ir: " + indexRequest);
--- End diff --

Leftover print.


---


[GitHub] flink pull request #6043: [FLINK-7386] evolve RequestIndexer API to make it ...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6043#discussion_r190126707
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 ---
@@ -21,18 +21,56 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
-public interface RequestIndexer {
+public abstract class RequestIndexer {
 
/**
 * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
 *
 * @param actionRequests The multiple {@link ActionRequest} to add.
+* @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@Up}
--- End diff --

typo at the end.


---


[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190126277
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -30,168 +30,50 @@ if [ -z "$FLINK_DIR" ] ; then
 exit 1
 fi
 
-source "$(dirname "$0")"/test-scripts/common.sh
+source "$(dirname "$0")"/test-scripts/test-runner-common.sh
 
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
 echo "Flink distribution directory: $FLINK_DIR"
 
-EXIT_CODE=0
-
 # Template for adding a test:
 
-# if [ $EXIT_CODE == 0 ]; then
-#run_test "" "$END_TO_END_DIR/test-scripts/"
-#EXIT_CODE=$?
-# fi
-
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 
file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file 
false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks 
false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, 
async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true 
true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
  

[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190126071
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -53,8 +53,6 @@ function cleanup_after_test {
 #
 kill ${watchdog_pid} 2> /dev/null
 wait ${watchdog_pid} 2> /dev/null
-#
-cleanup
--- End diff --

The `test_local_recovery_and_scheduling` test currently bundles several 
executions of the test (e.g. with different state backend configurations) in a 
single run of the test script. That's why it required this cleanup within the 
test itself.

How would the change of this PR affect this?
In general, should we also restructure e2e tests so that each execution 
configuration variant should be executed with the 
`test-runner-cleanup#run_test` method (instead of cleaning up itself in-between 
executions)?

AFAIK, only the `test_local_recovery_and_scheduling` does this at the 
moment.


---


[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190124962
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -31,56 +31,23 @@ if [ -z "$FLINK_DIR" ] ; then
 fi
 
 source "$(dirname "$0")"/test-scripts/common.sh
--- End diff --

I think this can now be removed?


---


[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190125404
  
--- Diff: flink-end-to-end-tests/test-scripts/test-runner-common.sh ---
@@ -0,0 +1,76 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(pwd)"/test-scripts/common.sh
+
+###
+# Prints the given description, runs the given test and prints how long 
the execution took.
+# Arguments:
+#   $1: description of the test
+#   $2: command to execute
+###
+function run_test {
+description="$1"
+command="$2"
+
+printf 
"\n==\n"
+printf "Running '${description}'\n"
+printf 
"==\n"
+start_timer
+${command}
+exit_code="$?"
+time_elapsed=$(end_timer)
+
+check_logs_for_errors
+check_logs_for_exceptions
+check_logs_for_non_empty_out_files
--- End diff --

Maybe we should move all these methods:
```
start_timer
end_timer
check_logs_for_errors
check_logs_for_exceptions
check_logs_for_non_empty_out_files
```

to `test-runner-common.sh` since that's the only place they are used anyways


---


[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6058#discussion_r190123863
  
--- Diff: docs/dev/stream/testing.md ---
@@ -181,7 +181,7 @@ public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+class ExampleIntegrationTest extends 
ScalaStreamingMultipleProgramsTestBase {
--- End diff --

I see that in the code we still have only 1 usage of 
`ScalaStreamingMultipleProgramsTestBase`, but I don't see why it can't be 
replaced with `AbstractTestBase` also (therefore removing 
`ScalaStreamingMultipleProgramsTestBase` and not referencing it anymore in 
docs).

@zentol could you comment here?


---


[GitHub] flink pull request #6058: [FLINK-9415] Remove reference to StreamingMultiple...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6058#discussion_r190123818
  
--- Diff: docs/dev/stream/testing.md ---
@@ -181,7 +181,7 @@ public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
 
 
 {% highlight scala %}
-class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+class ExampleIntegrationTest extends 
ScalaStreamingMultipleProgramsTestBase {
--- End diff --

I see that in the code we still have only 1 usage of 
`ScalaStreamingMultipleProgramsTestBase`, but I don't see why it can't be 
replaced with `AbstractTestBase` also (therefore removing 
`ScalaStreamingMultipleProgramsTestBase` and not referencing it anymore in 
docs).

@zentol could you comment here?


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190120134
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
+   Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+   Collections.singletonMap(testPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+   final TestFetcher fetcher = new TestFetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic assigner */
+   null, /* punctuated assigner */
+   new TestProcessingTimeService(),
+   10);
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   int fetchTasks = 5;
+   final CountDownLatch latch = new CountDownLatch(fetchTasks);
+   ExecutorService service = 
Executors.newFixedThreadPool(fetchTasks + 1);
+
+   service.submit(new Thread("fetcher runner") {
+   @Override
+   public void run() {
+   try {
+   latch.await();
+   fetcher.runFetchLoop();
--- End diff --

So, IMO, the test should look something like this:

```
final OneShotLatch fetchLoopWaitLatch = new OneShotLatch();
final OneShotLatch stateIterationBlockLatch = new 
OneShotLatch();

final TestFetcher fetcher = new TestFetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
10,
fetchLoopWaitLatch,
stateIterationBlockLatch);

// - run the fetcher -

final CheckedThread checkedThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetchLoop();
}
};
checkedThread.start();

// wait until state iteration begins before adding discovered 
partitions
fetchLoopWaitLatch.await();

fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition));

stateIterationBlockLatch.trigger();
checkedThread.sync();
```


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190111529
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -416,9 +520,16 @@ protected TestFetcher(
false);
}
 
+   /**
+* Emulation of partition's iteration which is required for
+* {@link 
AbstractFetcherTest#testConcurrentPartitionsDiscoveryAndLoopFetching}.
+* @throws Exception
+*/
@Override
public void runFetchLoop() throws Exception {
-   throw new UnsupportedOperationException();
+   for (KafkaTopicPartitionState ignored: 
subscribedPartitionStates()) {
+   Thread.sleep(10L);
--- End diff --

This would only let the test fail "occasionally", right?
I would like this to be changed, so that we always have the test failing 
without the copy on write fix.
We could do this by having a dummy source context that blocks on record 
emit.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190110928
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
--- End diff --

It is unnecessary to use a power mock here. A dummy implementation of a 
`SourceContext` will be better.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190112933
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -507,7 +507,11 @@ private void updateMinPunctuatedWatermark(Watermark 
nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks> 
watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, 
ClassNotFoundException {
 
-   List<KafkaTopicPartitionState> partitionStates = new 
LinkedList<>();
+   /**
+*  CopyOnWrite as adding discovered partitions could happen in 
parallel
+*  with different threads iterating by {@link 
AbstractFetcher#subscribedPartitionStates} results
+*/
--- End diff --

I think we usually don't have Javadoc blocks within methods. A regular 
comment with `//` would do.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190111239
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
+   Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+   Collections.singletonMap(testPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+   final TestFetcher fetcher = new TestFetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic assigner */
+   null, /* punctuated assigner */
+   new TestProcessingTimeService(),
+   10);
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   int fetchTasks = 5;
+   final CountDownLatch latch = new CountDownLatch(fetchTasks);
+   ExecutorService service = 
Executors.newFixedThreadPool(fetchTasks + 1);
+
+   service.submit(new Thread("fetcher runner") {
+   @Override
+   public void run() {
+   try {
+   latch.await();
+   fetcher.runFetchLoop();
+   } catch (Throwable t) {
+   error.set(t);
+   }
+   }
+   });
+
+   for (int i = 0; i < fetchTasks; i++) {
+   service.submit(new Thread("add partitions " + i) {
+   @Override
+   public void run() {
+   try {
+   List 
newPartitions = new ArrayList<>();
+   for (int i = 0; i < 1000; i++) {
+   
newPartitions.add(testPartition);
+   }
+   
fetcher.addDiscoveredPartitions(newPartitions);
+   latch.countDown();
+   for (int i = 0; i < 100; i++) {
+   
fetcher.addDiscoveredPartitions(newPartitions);
+   Thread.sleep(1L);
+   }
+   } catch (Throwable t) {
+   error.set(t);
+   }
+   }
+   });
+   }
+
+   service.awaitTermination(1L, TimeUnit.SECONDS);
+
+   // - trigger the offset commit -
--- End diff --

We should be able to ignore offset commit triggering in this test


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190120209
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
+   Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+   Collections.singletonMap(testPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+   final TestFetcher fetcher = new TestFetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic assigner */
+   null, /* punctuated assigner */
+   new TestProcessingTimeService(),
+   10);
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   int fetchTasks = 5;
+   final CountDownLatch latch = new CountDownLatch(fetchTasks);
+   ExecutorService service = 
Executors.newFixedThreadPool(fetchTasks + 1);
+
+   service.submit(new Thread("fetcher runner") {
+   @Override
+   public void run() {
+   try {
+   latch.await();
+   fetcher.runFetchLoop();
--- End diff --

The final `checkedThread.sync()` would always fail with the 
`ConcurrentModificationException` if the test is designed like this.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190111078
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
+   Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+   Collections.singletonMap(testPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+   final TestFetcher fetcher = new TestFetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic assigner */
+   null, /* punctuated assigner */
+   new TestProcessingTimeService(),
+   10);
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
--- End diff --

Flink provides a `CheckedThread` utility so you don't have to do this 
thread error referencing.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r190114844
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
 ---
@@ -390,6 +398,102 @@ public void 
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100, 
sourceContext.getLatestWatermark().getTimestamp());
}
 
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // - create the test fetcher -
+
+   @SuppressWarnings("unchecked")
+   SourceContext sourceContext = 
PowerMockito.mock(SourceContext.class);
+   Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+   Collections.singletonMap(testPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+   final TestFetcher fetcher = new TestFetcher<>(
+   sourceContext,
+   partitionsWithInitialOffsets,
+   null, /* periodic assigner */
+   null, /* punctuated assigner */
+   new TestProcessingTimeService(),
+   10);
+
+   // - run the fetcher -
+
+   final AtomicReference error = new 
AtomicReference<>();
+   int fetchTasks = 5;
+   final CountDownLatch latch = new CountDownLatch(fetchTasks);
+   ExecutorService service = 
Executors.newFixedThreadPool(fetchTasks + 1);
+
+   service.submit(new Thread("fetcher runner") {
+   @Override
+   public void run() {
+   try {
+   latch.await();
+   fetcher.runFetchLoop();
--- End diff --

The sequence here seems a bit odd to me.

I think we should be testing this as follows:
1. Run the fetch loop, and let it be blocked on record emitting (which then 
should let it be blocked mid-iteration)
2. Add a discovered partition; this should not throw an exception.


---


[GitHub] flink issue #6045: [FLINK-9402] [kinesis] Kinesis consumer configuration req...

2018-05-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6045
  
Thanks @tweise.
Have only one comment, otherwise this looks good to merge.


---


[GitHub] flink pull request #6045: [FLINK-9402] [kinesis] Kinesis consumer configurat...

2018-05-22 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6045#discussion_r189815405
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -244,7 +244,11 @@ public static void validateAwsConfiguration(Properties 
config) {
}
 
if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
-   throw new IllegalArgumentException("The AWS region ('" 
+ AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+   if 
(!config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT)) {
+   // per validation in AwsClientBuilder
+   throw new 
IllegalArgumentException(String.format("Either AWS region ('%s') or AWS 
endpoint ('%s') must be set in the config.",
+   AWSConfigConstants.AWS_REGION, 
AWSConfigConstants.AWS_REGION));
+   }
} else {
--- End diff --

Do we also need to check that not both `AWS_REGION` and `AWS_ENDPOINT` is 
set?
(Since the AwsClientBuilder says that ONLY ONE of these 2 may be set).


---


[GitHub] flink issue #5977: [FLINK-9295][kafka] Fix transactional.id collisions for F...

2018-05-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5977
  
Thanks for the update @pnowojski.
Changes LGTM, +1.

Merging this ..


---


[GitHub] flink issue #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to end tes...

2018-05-22 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5761
  
+1, LGTM, merging ...


---


[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5849
  
@walterddr yes, you can do that. In the description, just leave some notice 
that the PR is based on another, and which of the commits are relevant.


---


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
@FredTing we had some offline discussion on how to proceed with this.
@aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.

The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.

Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.

Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
```
// common interface for all sources (we already have this)
interface DeserializationSchema {
T deserialize(byte[] bytes);
}

// ... and a Kafka-specific interface that is only used to provide record 
meta information
interface ConsumerRecordMetaInfoProvider {
T enrich(T record, ConsumerRecordMetaInfo metaInfo);
}
```

The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.

What do you think?


---


[GitHub] flink issue #5849: [FLINK-8986][e2e-test][WIP] Flink end to end test REST AP...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5849
  
Hi @walterddr, what is the status of this PR? Would be nice if we can move 
forward with this PR (and also the CLI e2e test PR that also you opened.)


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185140
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Since this is a detached execution, we probably want to wait until this job 
completes before continuing?


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185253
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
--- End diff --

Should we verify the output of `list`?


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185689
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -s"
+EXIT_CODE=$?
+fi
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink list -r"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test operation on running streaming jobs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+RETURN=`$FLINK_DIR/bin/flink run -d \
+

[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185232
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
+  -c org.apache.flink.examples.java.wordcount.WordCount \
+  $FLINK_DIR/examples/batch/WordCount.jar \
+  --input file:///$FLINK_DIR/README.txt \
+  --output file:///${TEST_DATA_DIR}/out/result"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test information APIs\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink info 
$FLINK_DIR/examples/batch/WordCount.jar"
--- End diff --

Should we verify the output of `info`?


---


[GitHub] flink pull request #5863: [FLINK-8985][e2etest] initial support for End-to-e...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r189185047
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

There probably should be some verification that the job actually runs with 
DOP=4


---


  1   2   3   4   5   6   7   8   9   10   >