[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5823
  
Thanks @zhangminglei. This looks good to merge, will proceed to merge this.


---


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5374
  
@cjolif since we have now reached a conclusion on where the Elasticsearch 
connector should be improved in the future, could you maybe close this PR? I 
assume a new PR will be opened that subsumes this one.


---


[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-18 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
@snuyanzin the failing `YARNSessionCapacitySchedulerITCase` is known to be 
bit flaky, so you can safely ignore that for now. I'll take another look at 
your changes soon. Thanks!


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-18 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189177308
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   producer.flush();
--- End diff --

When we add a record to the producer queue via 
`producer.addUserRecord(...)`, we get a callback. We can use that callback to 
notify the blocking operation in `checkQueueLimit`.


---


[GitHub] flink issue #5941: [FLINK-8971] [e2e] Include broadcast / union state in gen...

2018-05-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5941
  
I think it is safe to merge this change.
Will merge this now ..


---


[GitHub] flink issue #5955: [FLINK-8659] Add migration itcases for broadcast state.

2018-05-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5955
  
@kl0u yes, lets do that as a separate commit then.

+1, this looks good to me.

One final comment for the merge:
When merging to `master`, we should have test savepoints for both `1.5` 
(taken in the release-1.5 branch), and `1.6` (taken in the current `master`) 
branch.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189163394
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws 
Exception {
throw new RuntimeException("Kinesis producer has been 
closed");
}
 
+   checkAndPropagateAsyncError();
+   checkQueueLimit();
checkAndPropagateAsyncError();
--- End diff --

This second check is to check any async errors that occurred during the 
queue flush, correct?
If so, we should probably move this second invocation into 
`checkQueueLimit` to make this more implicit.


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r189164871
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +342,24 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void checkQueueLimit() {
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   producer.flush();
--- End diff --

Do we have to do a flush here? Shouldn't the KPL child process process the 
user records in the background without an explicit flush call?

If so, perhaps a more graceful solution here is to wait on a local object, 
and notify it to wake up in the asynchronous producer write call backs. After 
being notified, we check the `getOutstandingRecordsCount` agains the 
queueLimit, and either wait more or escape the loop.

What do you think?


---


[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
Thanks for the PR @snuyanzin!
I had some comments, please let me know what you think.

Also, some general contribution tips:
1. I would suggest the title of the PR to be something along the lines of 
"[FLINK-9349] [kafka] Fix ConcurrentModificationException when add discovered 
partitions". That directly makes it clear what exactly is being fixed.
2. The message of the first commit of the PR should also be appropriately 
set to be similar to the title (most of the time if it is a 1-commit PR, the 
title of the PR and the commit message can be identical).


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r189031751
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Flink9349Test}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
--- End diff --

Likewise, Kafka 08 / 09 / 010 / 011 should all have this test coverage.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r189029077
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark 
nextWatermark) {
SerializedValue<AssignerWithPunctuatedWatermarks> 
watermarksPunctuated,
ClassLoader userCodeClassLoader) throws IOException, 
ClassNotFoundException {
 
-   List<KafkaTopicPartitionState> partitionStates = new 
LinkedList<>();
+   List<KafkaTopicPartitionState> partitionStates = new 
CopyOnWriteArrayList<>();
--- End diff --

Would be nice to have a comment on why we need to use a 
`CopyOnWriteArrayList`


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r189031075
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Flink9349Test}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
--- End diff --

I think we should have a similar test, but move it to `Kafka09FetcherTest`.


---


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-17 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6040#discussion_r189031464
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.anyLong;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Unit tests for the {@link Flink9349Test}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConsumerThread.class)
+public class Flink9349Test {
+   @Test
+   public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws 
Exception {
+
+   // test data
+   final KafkaTopicPartition testPartition = new 
KafkaTopicPartition("test", 42);
+   final Map<KafkaTopicPartition, Long> testCommitData = new 
HashMap<>();
+   testCommitData.put(testPartition, 11L);
+
+   // to synchronize when the consumer is in its blocking method
+   final OneShotLatch sync = new OneShotLatch();
+
+   // - the mock consumer with blocking poll calls 
+   final MultiShotLatch blockerLatch = new MultiShotLatch();
+
+   KafkaConsumer mockConsumer = mock(KafkaConsumer.class);
+   when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords>() {
+
+   @Override
+   public ConsumerRecords answer(InvocationOnMock 
invocation) throws InterruptedException {
+   sync.trigger();
+   blockerLatch.await();
+   return ConsumerRecords.empty();
+   }
+   });
+
+   doAnswer(new Answer() {
+   @Override
+   public Void answer(InvocationOnMock invocation) {

[GitHub] flink pull request #6038: [FLINK-9394] [e2e] Test rescaling when resuming fr...

2018-05-17 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6038

[FLINK-9394] [e2e] Test rescaling when resuming from externalized 
checkpoints

## What is the purpose of the change

This PR further extends the `test_resume_externalized_checkpoints.sh` e2e 
test to cover rescaling cases.

## Brief change log

- Allow specifying old / new parallelism and state backend configuration 
for the test script
- Modify nightly test script

## Verifying this change

This is a simple extension to existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9394

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6038.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6038


commit eb738232d864f2811987927ee1ea6352a3a041f7
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-17T16:33:12Z

[FLINK-9394] [e2e] Test rescaling when resuming from externalized 
checkpoints




---


[GitHub] flink issue #6038: [FLINK-9394] [e2e] Test rescaling when resuming from exte...

2018-05-17 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6038
  
r: @StefanRRichter 


---


[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188548653
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -33,8 +33,8 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
-STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
--- End diff --

We should also update the usage message at the beginning of this file to 
reflect these extra parameters.


---


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511081
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512949
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511258
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512862
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512804
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511280
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection<Tuple2<MigrationVersion, String>> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2<MigrationVersion, String> 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   ca

[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188507930
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -43,121 +43,151 @@ EXIT_CODE=0
 # printf 
"\n==\n"
 # printf "Running my fancy nightly end-to-end test\n"
 # printf 
"==\n"
+# start_timer
 # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh
 # EXIT_CODE=$?
+# end_timer
 # fi
 
 
 if [ $EXIT_CODE == 0 ]; then
 printf 
"\n==\n"
 printf "Running HA end-to-end test\n"
 printf 
"==\n"
+start_timer
 $END_TO_END_DIR/test-scripts/test_ha.sh
 EXIT_CODE=$?
+end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
   EXIT_CODE=$?
+  end_timer
 fi
--- End diff --

Perhaps we can refactor this whole block into a common base script for the 
nightly / pre-commit hook scripts.
The end_timer / start_timer functions should also be located there.


---


[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6008
  
I see these in the Travis logs:
```
flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: 
command not found
...
flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command 
not found
```

I think you'll have to include `common.sh` in the nightly / pre-commit hook 
scripts.


---


[GitHub] flink issue #6018: [FLINK-9372] Typo on Elasticsearch website link (elastic....

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6018
  
Thanks for catching this @medcv!
Apparently elastic.io directs to a completely different website 😅.
Merging this ..


---


[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5977#discussion_r188205000
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -71,6 +71,17 @@
@PublicEvolving
MetricGroup getMetricGroup();
 
+   /**
+* Returned value is guaranteed to be unique between operators within 
the same job and to be
+* stable and the same across job submissions.
+*
+* This operation is currently only supported in Streaming 
(DataStream) contexts.
+*
+* @return String representation of the operator's unique id.
+*/
+   @PublicEvolving
+   String getOperatorUniqueID();
--- End diff --

I'm slightly leaning towards Stephan's suggestion, which I also agree is 
the better solution for this case.

It might be ok to have this as a "hidden" API for now anyways, since 1) it 
is marked `@PublicEvolving`, and 2) the API was added in quite a short 
timeframe.
If we want this fix in 1.5, I wouldn't suggest "fully" exposing it.


---


[GitHub] flink issue #6002: [FLINK-9350] Parameter baseInterval has wrong check messa...

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6002
  
LGTM, merging ..


---


[GitHub] flink issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore probl...

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5984
  
Hi @Myasuka, it seems like your local branch is not properly rebased on the 
latest master.
Could you try rebasing again and reopening the PR?


---


[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6008
  
+1 LGTM


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188195971
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz;
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_P

[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188195534
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
--- End diff --

also set `PASS=""` if you want to fail the e2e test


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188195999
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz;
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_P

[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188196219
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.{JobManager, 
SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
--- End diff --

Is this irrelevant? Would prefer this as a separate hotfix.


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188196115
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz;
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
--- End diff --

Before running the Flink job, we should verify that the Elasticsearch node 
really is running.


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188194545
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
--- End diff --

This should be `1.6-SNAPSHOT` now?


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188195349
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
--- End diff --

The `$CURRENT_DIR` can be anywhere, so this path is basically invalid.
In the e2e test scripts we should be referencing paths to built jar / files 
relative to `$TEST_INFRA_DIR`.


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188194072
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
--- End diff --

I would suggest to generate the mvn project under `$TEST_DATA_DIR`. Then, 
it will be properly cleaned up after the test completes.


---


[GitHub] flink issue #5823: [FLINK-9008] [e2e] Implements quickstarts end to end test

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5823
  
@medcv That sounds like something we should do as part of the release 
process (if we are going to do it), orthogonal to this PR here.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187848661
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh ---
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for ElasticsearchSink1.x.
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz;
+echo "Downing Elasticsearch from $ELASTICSEARCH_URL"
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch cluster
+$ELASTICSEARCH_DIR/bin/elasticsearch -daemon
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/../flink-elasticsearch1-test/target/Elasticsearch1SinkExample.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR \
+  --index index \
+  --type type
+
+touch $TEST_DATA_DIR/output
+
+curl 'localhost:9200/index/_search?q=*=21' > 
$TEST_DATA_DIR/output
+
+if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+echo "Elasticsearch end to end test pass."
+fi
--- End diff --

we should also have the `else` case, where `PASS` is set to `""`, so that 
the test properly fails.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187847421
  
--- Diff: 
flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/elasticsearch1/test/Elasticsearch1SinkExample.java
 ---
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.elasticsearch.examples;
+package org.apache.flink.elasticsearch1.test;
--- End diff --

I think the package should be:
`org.apache.flink.streaming.tests`

This would then be consistent with out other end-to-end test jobs


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187848008
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh ---
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for ElasticsearchSink1.x.
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz;
+echo "Downing Elasticsearch from $ELASTICSEARCH_URL"
--- End diff --

`Downing` --> `Downloading`


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187847498
  
--- Diff: 
flink-end-to-end-tests/flink-elasticsearch1-test/src/main/java/org/apache/flink/elasticsearch1/test/Elasticsearch1SinkExample.java
 ---
@@ -37,20 +38,33 @@
 import java.util.Map;
 
 /**
- * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
- * you have a cluster named "elasticsearch" running or change the cluster 
name in the config map.
+ * End to end test for Elasticsearch1Sink.
+ *
+ * This example shows how to use the Elasticsearch1 Sink from an user 
endpoint. Before running it you
+ * must ensure that you have a cluster named "elasticsearch" running or 
change the cluster name in the config map.
--- End diff --

This Javadoc doesn't make sense when considering it as a job simply used in 
end-to-end tests.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187848355
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh ---
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for ElasticsearchSink1.x.
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz;
+echo "Downing Elasticsearch from $ELASTICSEARCH_URL"
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-1.7.1
+
+# start elasticsearch cluster
+$ELASTICSEARCH_DIR/bin/elasticsearch -daemon
--- End diff --

There is no verification that the elasticsearch node is really up and ready 
before continuing.
This could lead to brittle tests.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187847923
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch1.sh ---
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for ElasticsearchSink1.x.
--- End diff --

Can we have a single script for all Elasticsearch versions?
It doesn't seem right that we have 3 scripts we a lot of duplicate bash 
code.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187847586
  
--- Diff: flink-end-to-end-tests/flink-elasticsearch2-test/pom.xml ---
@@ -0,0 +1,140 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   ..
+   
+
+   4.0.0
+
+   
flink-elasticsearch2-test_${scala.binary.version}
+   flink-elasticsearch2-test
+   jar
+
+   
+   
+   2.3.5
--- End diff --

I don't think we need to make this configurable in the tests.


---


[GitHub] flink pull request #5761: [FLINK-8989] [e2eTests] Elasticsearch1&2&5 end to ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5761#discussion_r187847815
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -53,6 +53,30 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n==\n"
+printf "Running Resuming Savepoint (no parallelism change) end-to-end 
test\n"
+printf 
"==\n"
+$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+EXIT_CODE=$?
+fi
--- End diff --

The changes in this file seems to be unintended, and should be removed.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r187845478
  
--- Diff: 
flink-end-to-end-tests/flink-local-recovery-and-allocation-test/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ *
+ * List of possible input parameters for this job:
+ * 
+ * checkpointDir: the checkpoint directory, required.
+ * parallelism: the parallelism of the job, default 1.
+ * maxParallelism: the maximum parallelism of the job, default 1.
+ * checkpointInterval: the checkpointing interval in 
milliseconds, default 1000.
+ * restartDelay: the delay of the fixed delay restart 
strategy, default 0.
+ * externalizedCheckpoints: flag to activate externalized 
checkpoints, default false.
+ * stateBackend: choice for state backend between 
file and rocks, default file.
+ * killJvmOnFail: flag that determines whether or not an 
artificial failure induced by the test kills the JVM or not.
+ * asyncCheckpoints: flag for async checkpoints with file 
state backend, default true.
+ * incrementalCheckpoints: flag for incremental checkpoint 
with rocks state backend, default false.
+ * delay: sleep delay to throttle down the production of the 
source, default 0.
+ * maxAttempts: the maximum number of run attempts, before the 
job finishes with success, default 3.
+ * valueSize: size of the artificial value for each key in 
bytes, default 10.
+ * 
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r187843886
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,115 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with 
local recovery
+function check_logs {
+  local parallelism=$1
+  local attempts=$2
+  (( expected_count=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  local failed_local_recovery=$(grep '^.*Creating keyed state backend.* 
from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* 
from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failed_local_recovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attempt_local_recovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+# This function does a cleanup after the test. The configuration is 
restored, the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdog_pid} 2> /dev/null
+  wait ${watchdog_pid} 2> /dev/null
+  #
+  cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+  cleanup_after_test
+  exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+  local parallelism=$1
+  local max_attempts=$2
+  local backend=$3
+  local incremental=$4
+  local kill_jvm=$5
+
+  echo "Running local recovery test on ${backend} backend: incremental 
checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+  
TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+  # Enable debug logging
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' 
"$FLINK_DIR/conf/log4j.properties"
+
+  # Enable local recovery
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
--- End diff --

FYI: we now have a `change_conf` method in `common.sh` to modify Flink 
configuration in e2e tests


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r187842776
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java 
---
@@ -140,7 +140,8 @@
 
public static final ConfigOption SLOT_IDLE_TIMEOUT =
key("slot.idle.timeout")
-   .defaultValue(10L * 1000L)
+   // default matches heartbeat.timeout so that sticky 
allocation is not lost on timeouts for local recovery
+   
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
--- End diff --

reminder: if this results in a different default value, I would suggest to 
make a note of the change in the "Release Notes" field of the JIRA ticket.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-05-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r187844104
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,115 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+# This function checks the logs for entries that indicate problems with 
local recovery
+function check_logs {
+  local parallelism=$1
+  local attempts=$2
+  (( expected_count=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  local failed_local_recovery=$(grep '^.*Creating keyed state backend.* 
from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* 
from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failed_local_recovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attempt_local_recovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+# This function does a cleanup after the test. The configuration is 
restored, the watchdog is terminated and temporary
+# files and folders are deleted.
+function cleanup_after_test {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdog_pid} 2> /dev/null
+  wait ${watchdog_pid} 2> /dev/null
+  #
+  cleanup
+}
+
+# Calls the cleanup step for this tests and exits with an error.
+function cleanup_after_test_and_exit_fail {
+  cleanup_after_test
+  exit 1
+}
+
+## This function executes one run for a certain configuration
+function run_local_recovery_test {
+  local parallelism=$1
+  local max_attempts=$2
+  local backend=$3
+  local incremental=$4
+  local kill_jvm=$5
+
+  echo "Running local recovery test on ${backend} backend: incremental 
checkpoints = ${incremental}, kill JVM = ${kill_jvm}."
+  
TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
+
+  # Enable debug logging
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' 
"$FLINK_DIR/conf/log4j.properties"
+
+  # Enable local recovery
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
+
+  rm $FLINK_DIR/log/* 2> /dev/null
+
+  start_cluster
+
+  tm_watchdog ${parallelism} &
+  watchdog_pid=$!
+
+  echo "Started TM watchdog with PID ${watchdog_pid}."
+
+  $FLINK_DIR/bin/flink run -c 
org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+  -p ${parallelism} $TEST_PROGRAM_JAR \
+  -D state.backend.local-recovery=ENABLE_FILE_BASED \
+  --checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
+  --output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail 
${kill_jvm} --checkpointInterval 1000 \
+  --maxAttempts ${max_attempts} --parallelism ${parallelism} 
--stateBacke

[GitHub] flink issue #5969: [FLINK-9074] [e2e] Add e2e for resuming from externalized...

2018-05-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5969
  
Thanks a lot for the review @fhueske.
Will merge this!


---


[GitHub] flink pull request #6004: [FLINK-8977] [e2e] End-to-end test for manual job ...

2018-05-13 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6004

[FLINK-8977] [e2e] End-to-end test for manual job resume after terminal 
failure

## What is the purpose of the change

This PR is based on new e2e features introduced by #5941, #5990, and #5969.
Only the last two commits are relevant to FLINK-8977.

This PR adds e2e test coverage for the case that after a terminal failure 
caused by the user job code, manually resuming from a retained checkpoint works 
correctly.

This is achieved by extending the `test_resume_externalized_checkpoints.sh` 
test script to accept a `SIMULATE_FAILURE` flag.

## Brief change log

- 9360ea9 Extend the general purpose DataStream job to allow configuring 
restart strategies.
- b5d713c Extend `test_resume_externalized_checkpoints.sh` to allow 
simulating the job failure + manual resume case.

## Verifying this change

Verifiable by running locally the following e2e test script:
`SIMULATE_FAILURE=true 
flink-end-to-end-tests/test-scripts/test_resume_externalized_checkpoints.sh`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8977

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6004.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6004


commit 8db7f894b67b00f94148e0314a1c10d76266a350
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-04-30T10:04:43Z

[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key 
ranges

commit c8e14673e58aed0f9625e38875ec85a776282ad4
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-04-30T10:05:46Z

[FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose 
DataStream job

commit 78354b295832fa2ec5d829ec4ac21150ecac1231
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-08T03:44:13Z

PR review - refactor source run function

commit f346fd0958e7c3361886680912630fe22761a63d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-08T04:39:40Z

PR review - simplify broadcast / union state verification

commit b01cfda7d77723e8ded2ce99ee12f17352a3ca1f
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-11T03:51:12Z

[FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream 
job

commit 0931f6ed48523ca46e2c99adc24950777d843ac8
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-11T07:09:00Z

[FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job

commit a819e56e0998e09bb6461b6c76be0807d83a1ef5
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-09T03:40:24Z

[FLINK-9074] [e2e] Allow configuring externalized checkpoints for the 
general purpose DataStream job

commit 3d0c83a991ab78d03c3cc1c9ff2abb61e0329d9d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-09T04:17:25Z

[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints

commit 9360ea9ad0db858e7fdeecb54b1918e6b84cae1d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-14T03:56:07Z

[FLINK-8977] [e2e] Allow configuring restart strategy for general purpose 
DataStream job

commit b5d713cf19290be437286e152d921b23ff532c7d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-14T03:56:41Z

[FLINK-8977] [e2e] Extend externalized checkpoint e2e to simulate job 
failures




---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788278
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788338
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
private List getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
List shardsOfStream = new ArrayList<>();
 
-   DescribeStreamResult describeStreamResult;
+   // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
+   // we need to use the returned nextToken to get additional 
shards.
+   ListShardsResult listShardsResult;
+   String startShardToken = null;
do {
-   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
--- End diff --

if within `listShards(...)` we caught the `ExpiredNextTokenException`, then 
`null` will be returned as the result, correct?
If so, then the current built up `shardsIfStream` will be returned 
immediately, regardless of whether or not there are more shards following.

Although it might not be too common that we have expired tokens here, I 
wonder if we can handle this case more gracefully (e.g., re-fetching a token to 
make sure that there really is no more shards).


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788213
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

Changing the name of this variable, strictly speaking, breaks backwards 
compatibility, as users might be using them.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788363
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
}
 
@Test
-   public void testCustomConfigurationOverride() {
-   Properties configProps = new Properties();
-   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-   KinesisProxy proxy = new KinesisProxy(configProps) {
-   @Override
-   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
-   ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
-   clientConfig.setSocketTimeout(1);
-   return AWSUtil.createKinesisClient(configProps, 
clientConfig);
+   public void testGetShardList() throws Exception {
+   List shardIds =
+   Arrays.asList(
+   "shardId-",
+   "shardId-0001",
+   "shardId-0002",
+   "shardId-0003");
+   shardIdSet = new HashSet<>(shardIds);
+   shards =
+   shardIds
+   .stream()
+   .map(shardId -> new 
Shard().withShardId(shardId))
+   .collect(Collectors.toList());
+   Properties kinesisConsumerConfig = new Properties();
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
+   kinesisConsumerConfig.setProperty(
+   ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
+   KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+   AmazonKinesis mockClient = mock(AmazonKinesis.class);
+   Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
+
+   ListShardsResult responseWithMoreData =
+   new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+   ListShardsResult responseFinal =
+   new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
+   doReturn(responseWithMoreData)
+   .when(mockClient)
+   
.listShards(argThat(initialListShardsRequestMatcher()));
+   
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+   HashMap<String, String> streamHashMap =
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+   GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
+
+   Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+   Set expectedStreams = new HashSet<>();
+   expectedStreams.add(fakeStreamName);
+   
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
+   List actualShardList =
+   
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+   List expectedStreamShard = new ArrayList<>();
+   System.out.println(actualShardList.toString());
+   assertThat(actualShardList, hasSize(4));
+   for (int i = 0; i < 4; i++) {
+   StreamShardHandle shardHandle =
+   new StreamShardHandle(
+   fakeStreamName,
+   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+   expectedStreamShard.add(shardHandle);
+   }
+
+   Assert.assertThat(
+   actualShardList,
+   containsInAnyOrder(
+   expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
+   }
+
+   private static class ListShardsRequestMatcher

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788223
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

This goes the same for all other key variable rename changes in this class.
I would suggest to deprecate existing ones if we want to change the name 
internally.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-12 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r187788284
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+

[GitHub] flink issue #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE if top...

2018-05-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5929
  
@alexpf thanks for the updates.
Please bear with me just a little, I'll try to get back to reviewing this 
PR ASAP after I finish with some ongoing tasks.


---


[GitHub] flink pull request #5990: [FLINK-9322][FLINK-9320] [e2e] Improvements to e2e...

2018-05-11 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5990

[FLINK-9322][FLINK-9320] [e2e] Improvements to e2e standalone chaos monkey 
test

## What is the purpose of the change

This PR is based on #5941. Only the last 2 commits are relevant.

This PR improves our standalone e2e chaos monkey test by:
- Using the general purpose DataStream job, instead of the state machine 
example, to have a wider coverage of commonly used DataStream program building 
blocks.
- Lets the running job simulate failures by throwing exceptions. This 
enhances the intensiveness of the chaos monkey test.

## Brief change log

- b01cfda Allows the general purpose job to configure whether or not to 
simulate failures. This resolves FLINK-9322.
- 4009406 in `test_ha.sh`, use the general purpose job instead. This change 
additionally lets the e2e test now have failures caused by the user 
application, and not just TM / JM shutdowns. It also changes the 
parameterization of the test script to be consistent with our other e2e test 
scripts.

## Verifying this change

This is purely a change to improve current e2e tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink chaos-monkey-e2e

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5990.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5990


commit 8db7f894b67b00f94148e0314a1c10d76266a350
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-04-30T10:04:43Z

[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key 
ranges

commit c8e14673e58aed0f9625e38875ec85a776282ad4
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-04-30T10:05:46Z

[FLINK-8971] [e2e-tests] Include broadcast / union state in general purpose 
DataStream job

commit 78354b295832fa2ec5d829ec4ac21150ecac1231
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-08T03:44:13Z

PR review - refactor source run function

commit f346fd0958e7c3361886680912630fe22761a63d
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-08T04:39:40Z

PR review - simplify broadcast / union state verification

commit b01cfda7d77723e8ded2ce99ee12f17352a3ca1f
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-11T03:51:12Z

[FLINK-9322] [e2e] Add failure simulation to the general purpose DataStream 
job

commit 4009406d4729486d57cc4a71bcb72d269583a762
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-11T07:09:00Z

[FLINK-9320] [e2e] Update test_ha e2e to use general purpose DataStream job




---


[GitHub] flink pull request #5969: [FLINK-9074] [e2e] Add e2e for resuming from exter...

2018-05-09 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5969

[FLINK-9074] [e2e] Add e2e for resuming from externalized checkpoints

## What is the purpose of the change

This PR adds an end-to-end test for resuming from externalized, retained 
checkpoints.

The test does the following:
- Runs the general purpose DataStream job, with externalized checkpoints 
enabled
- Waits until the job has at least 1 completed checkpoints, AND has 
processed at least 200 records
- Cancel the job
- Make sure that there is exactly 1 externalized checkpoint available
- Restore from that, wait for another 200 records to be processed to verify 
that exactly-once isn't violated

## Brief change log

- Allow general purpose job to be configured for externalized checkpoints
- Add new e2e test script `test_resume_externalized_checkpoint.sh`

## Verifying this change

This PR adds a new test, `test_resume_externalized_checkpoint.sh`.

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-9074

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5969.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5969


commit bb8dce7622d83d4c2214013de24193ffa47a1e75
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-09T03:40:24Z

[FLINK-9074] [e2e] Allow configuring externalized checkpoints for the 
general purpose DataStream job

commit 91e7f911739a094d58051a7b620a4564fd7f6067
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-05-09T04:17:25Z

[FLINK-9074] [e2e] Add e2e test for resuming jobs from retained checkpoints




---


[GitHub] flink issue #5941: [FLINK-8971] [e2e] Include broadcast / union state in gen...

2018-05-08 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5941
  
@StefanRRichter the PR is ready for another review, thanks!


---


[GitHub] flink issue #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e tests for d...

2018-05-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5926
  
Thanks for the review @StefanRRichter! Will address your comment and merge 
this.


---


[GitHub] flink pull request #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e test...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5926#discussion_r186608526
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -58,25 +58,97 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (no parallelism change) end-to-end 
test\n"
+  printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale up) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale down) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale up) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, async, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, non-incremental, no 
parallelism change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, incremental, no parallelism 
change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
--- End diff --

This makes sense, will address this.


---


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
Thanks for the update @FredTing.
I'll try to take another look at the PR within the next days.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186606104
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

Makes sense. Alright, lets leave this as is then.


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186341972
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPartitionedStream
+ .connect(ruleBroadcastStream)
+ .process(
+ 
+ 

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186341885
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPartitionedStream
+ .connect(ruleBroadcastStream)
+ .process(
+ 
+ 

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186341059
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
--- End diff --

The BroadcastStream actually isn't formatted as a code font after rendering.


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186340944
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
--- End diff --

After rendering, this seems a bit off. Maybe we don't need it on a separate 
new line?


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186342816
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPartitionedStream
+ .connect(ruleBroadcastStream)
+ .process(
+ 
+ 

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186340110
  
--- Diff: docs/dev/stream/state/index.md ---
@@ -49,6 +49,7 @@ Where to go next?
 -
 
 * [Working with State](state.html): Shows how to use state in a Flink 
application and explains the different kinds of state.
+* [The Broadcast State Pattern](broadcast_state.html): Explains how to 
connect a broadcast with a non-broadcast stream and use state to exchange 
information between them. 
--- End diff --

how to connect a broadcast "stream" with a ...
missing "stream" word


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186339587
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -25,27 +25,25 @@ under the License.
 * ToC
 {:toc}
 
-[Working with State](state.html) described operator state which is either 
**evenly** distributed among the parallel
-tasks of an operator, or state which **upon restore**, its partial (task) 
states are **unioned** and the whole state is 
-used to initialize the restored parallel tasks.
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
 
-A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use-cases
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
 where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
 and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
 natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
-elements coming from another stream. Having the above type of use-cases in 
mind, broadcast state differs from the rest 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
 of operator states in that:
  1. it has a map format,
- 2. it is only available to streams whose elements are *broadcasted*,
- 3. the only operation available to a stream with broadcast state is to be 
*connected* to another keyed or non-keyed stream,
- 4. such a broadcast stream can have *multiple broadcast states* with 
different names.
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
--- End diff --

This looks good now (from what I read of it :) )!


---


[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186342376
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
+   as an argument.
+
+
+{% highlight java %}
+DataStream output = colorPartitionedStream
+ .connect(ruleBroadcastStream)
+ .process(
+ 
+ 

[GitHub] flink pull request #5922: [FLINK-8780] [docs] Add Broadcast State documentat...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5922#discussion_r186341293
  
--- Diff: docs/dev/stream/state/broadcast_state.md ---
@@ -0,0 +1,279 @@
+---
+title: "The Broadcast State Pattern"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+
+
+* ToC
+{:toc}
+
+[Working with State](state.html) describes operator state which upon 
restore is either evenly distributed among the 
+parallel tasks of an operator, or unioned, with the whole state being used 
to initialize the restored parallel tasks.
+
+A third type of supported *operator state* is the *Broadcast State*. 
Broadcast state was introduced to support use cases
+where some data coming from one stream is required to be broadcasted to 
all downstream tasks, where it is stored locally
+and is used to process all incoming elements on the other stream. As an 
example where broadcast state can emerge as a 
+natural fit, one can imagine a low-throughput stream containing a set of 
rules which we want to evaluate against all 
+elements coming from another stream. Having the above type of use cases in 
mind, broadcast state differs from the rest 
+of operator states in that:
+ 1. it has a map format,
+ 2. it is only available to specific operators that have as inputs a 
*broadcasted* stream and a *non-broadcasted* one, and
+ 3. such an operator can have *multiple broadcast states* with different 
names.
+
+## Provided APIs
+
+To show the provided APIs, we will start with an example before presenting 
their full functionality. As our running 
+example, we will use the case where we have a stream of objects of 
different colors and shapes and we want to find pairs
+of objects of the same color that follow a certain pattern, *e.g.* a 
rectangle followed by a triangle. We assume that
+the set of interesting patterns evolves over time. 
+
+In this example, the first stream will contain elements of type `Item` 
with a `Color` and a `Shape` property. The other
+stream will contain the `Rules`.
+
+Starting from the stream of `Items`, we just need to *key it* by `Color`, 
as we want pairs of the same color. This will
+make sure that elements of the same color end up on the same physical 
machine.
+
+{% highlight java %}
+// key the shapes by color
+KeyedStream<Item, Color> colorPartitionedStream = shapeStream
+.keyBy(new KeySelector<Shape, Color>(){...});
+{% endhighlight %}
+
+Moving on to the `Rules`, the stream containing them should be broadcasted 
to all downstream tasks, and these tasks 
+should store them locally so that they can evaluate them against all 
incoming `Items`. The snippet below will i) broadcast 
+the stream of rules and ii) using the provided `MapStateDescriptor`, it 
will create the broadcast state where the rules
+will be stored.
+
+{% highlight java %}
+
+// a map descriptor to store the name of the rule (string) and the rule 
itself.
+MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(
+   "RulesBroadcastState",
+   BasicTypeInfo.STRING_TYPE_INFO,
+   TypeInformation.of(new TypeHint() {})
+   );
+   
+// broadcast the rules and create the broadcast state
+BroadcastStream ruleBroadcastStream = ruleStream
+.broadcast(ruleStateDescriptor);
+{% endhighlight %}
+
+Finally, in order to evaluate the `Rules` against the incoming elements 
from the `Item` stream, we need to:
+1) connect the two streams and 
+2) specify our match detecting logic. 
+
+Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be 
done by calling `connect()` on the 
+non-broadcasted stream, with the `BroadcastStream` as an argument. This 
will return a `BroadcastConnectedStream`, on 
+which we can call `process()` with a special type of `CoProcessFunction`. 
The function will contain our matching logic. 
+The exact type of the function depends on the type of the non-broadcasted 
stream: 
+ - if that is **keyed**, then the function is a 
`KeyedBroadcastProcessFunction`. 
+ - if it is **non-keyed**, the function is a `BroadcastProcessFunction`. 
+ 
+ Given that our non-broadcasted stream is keyed, the following snippet 
includes the above calls:
+
+
+  Attention: The connect should be called on the 
non-broadcasted stream, with the `BroadcastStream`
--- End diff --

What happens if the user calls connect on the wrong (broadcasted) stream?
Is there an exception? If so it might make sense to clarify that here.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338002
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

Unnecessary space before period at the end.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337736
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

I wonder if `CREATE_TIME` should be renamed as `EVENT_TIME`, to be more 
coherent with Flink's terminologies.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337890
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

`NO_TIMESTAMP_TYPE` --> maybe `NO_TIMESTAMP` will do, since from the enum 
name we already know it is a type.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338049
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

For the deprecation, I would recommend explaining why the new deserialize 
method is more superior.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337834
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
+   }
+
+   /**
+* @return the key as a byte array (null if no key has been set).
+*/
+   byte[] getKey();
+
+   /**
+* @return The message, as a byte array (null if the message was empty 
or deleted).
+*/
+   byte[] getMessage();
+
+   /**
+* @return The topic the message has originated from (for example the 
Kafka topic).
+*/
+   String getTopic();
+
+   /**
+* @return The partition the message has originated from (for example 
the Kafka partition).
+*/
+   int getPartition();
+
+   /**
+* @return the offset of the message in the original source (for 
example the Kafka offset).
+*/
+   long getOffset();
+
+   /**
+* @return the timestamp of the consumer record
--- End diff --

Javadoc should educate the "dummy" timestamp value if timestamp type is 
`NO_TIMESTAMP`.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186338721
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I'm actually not sure that we should continue using this class, for the 
following reasons:

1. The class is actually placed under a non-ideal package:
`o.a.f.api.common.serialization`, whereas is should be placed under some 
`o.a.f.connectors.kafka`.
The reason it is currently placed under this package was because the 
`DeserializationSchema` was initially intended to be commonly used by all 
connectors. However, over time, things have proven that each connector will 
benefit from their own version of a schema class.

So, it actually might make sense to deprecate the whole 
`DeserializationSchema` class now, and have a new class (maybe called 
`KafkaDeserializationSchema` / `KafkaSerializationSchema`) under a correct 
Kafka package.

What do you think?


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186337301
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -78,6 +79,69 @@ public Kafka010Fetcher(
useMetrics);
}
 
+   private class KafkaConsumerRecordWrapper10 implements 
ConsumerRecordMetaInfo {
+   private static final long serialVersionUID = 
2651665280744549935L;
+
+   private final ConsumerRecord<byte[], byte[]> consumerRecord;
+
+   public KafkaConsumerRecordWrapper10(ConsumerRecord<byte[], 
byte[]> consumerRecord) {
+   this.consumerRecord = consumerRecord;
+   }
+
+   @Override
+   public byte[] getKey() {
+   return consumerRecord.key();
+   }
+
+   @Override
+   public byte[] getMessage() {
+   return consumerRecord.value();
+   }
+
+   @Override
+   public String getTopic() {
+   return consumerRecord.topic();
+   }
+
+   @Override
+   public int getPartition() {
+   return consumerRecord.partition();
+   }
+
+   @Override
+   public long getOffset() {
+   return consumerRecord.offset();
+   }
+
+   @Override
+   public long getTimestamp() {
+   return Long.MIN_VALUE;
--- End diff --

Doesn't Kafka 0.10 support record timestamps?


---


[GitHub] flink issue #5006: [hotfix][docs][QS] MInor cleanup of QS documentation

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5006
  
+1, LGTM. Will address Greg's last comment and merge this ..


---


[GitHub] flink issue #5956: [hotfix][javadocs] Correct the javadoc of StandaloneCheck...

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5956
  
LGTM, thanks @klion26.
Merging this ..


---


[GitHub] flink issue #5890: [FLINK-8999] [e2e] Ensure the job has an operator with op...

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5890
  
Hi @zhangminglei, I think the current general purpose DataStream job 
already subsumes this PR; for example, the sequence source generator in the job 
already uses operator state. Moreover, `test_resume_savepoint.sh` also tests 
the behaviours of that via a rescale operation.

What do you think? If you agree, we maybe can close this PR.


---


[GitHub] flink issue #5918: [hotfix][ListCheckpointed]modify parallelism in the annot...

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5918
  
Thanks @maqingxiang! This LGTM, will merge this, thanks.


---


[GitHub] flink issue #5952: [FLINK-9287][kafka] Properly clean up resources in non EX...

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5952
  
As discussed offline, both approaches I mentioned wouldn't seem to work.
I'll merge as is, let's keep an eye on it to see if the test is flaky.


---


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
Thanks for the review Stefan! Will merge this now ..


---


[GitHub] flink issue #5952: [FLINK-9287][kafka] Properly clean up resources in non EX...

2018-05-04 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5952
  
Would filtering the thread count by thread name be helpful here?

Another possible approach to test this:
Maybe we can simply verify that the current transaction producer is closed 
after the cleanup?
This would require making the `getCurrentTransaction()` method visible for 
tests, though. We'll also need to have a `isClosed()` method on the 
`FlinkKafkaProducer`.


---


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-04 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter I have updated the PR. Also had to do a rebase due to 
conflicts.

Regarding the thoughts you brought up:
- Bubble up `UnloadableTypeSerializerException` approach:
I introduced the exception, but only use it minimally. I think overall it 
is definitely an improvement, since we don't have to carry the dummy flag all 
the way down to the low-level serializer serialization proxy. However, I don't 
think there really is a need to handle it any level higher than the 
`TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience` 
method, since the original intent of that method was to always be fault 
tolerant when reading a bunch of serializers alongside other things. More 
details on that in my above comments.

- Whether or not we really need to hand down the flag to 
`KeyedBackendSerializationProxy`:
My gut feeling is that, even if in the future we bubble up the 
`UnloadableTypeSerializerException` to higher level components, the 
serialization proxy is still where we need to decide whether or not we handle 
it with a dummy serializer. The reason is that the serialization proxies 
handles deserialization of _all_ keyed state meta infos (and therefore their 
serializers); simply bubbling the exception further up without checking does 
not make sense.


---


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185994375
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

The reason why this method is so complex, is because it handles indexing of 
the serializers' and serializer config snapshots' offsets within the byte 
stream. It does so to be able to read all serializers and their serializer 
config snapshots fault tolerantly, and to not leave the stream corrupt when 
some exception occurs.

I'm not sure we can break this method up - doing so would just be moving a 
lot of duplicate code to the callers (due to the fact that we previously have 
the offset index reading / writing, if we remove that we still need to maintain 
backwards compatibility).


---


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185995278
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

One other thing to keep in mind:
The original intent of having the 
`readSerializersAndConfigSnapshotsWithResilience` method and why we added the 
complex indexing of offsets, is that so we can _always_ be fault tolerant when 
trying to read a bunch of serializers. So, essentially, there is no need to 
push out the exception further - the result of 
`readSerializersAndConfigSnapshotsWithResilience` should always be that there 
is some serializer, even if it is a dummy (hence the naming).


---


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-04 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185994454
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

Since we are already thinking about not writing serializers anymore in 
savepoints for 1.6, I'm leaning towards not touching this method now.


---


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
I think even with the `UnloadableTypeSerializerException` exception 
bubbling approach, we actually still need a flag in the serialization proxy to 
decide how to handle the exception.

The serialization proxy handles deserialization of all meta data of all 
registered key states, so that would be the highest level where we need to 
decide whether or not to use the dummy serializer.

If we want to hand out this control to an even higher level (i.e. the 
backend), we would then need to break up the deserialization logic from the 
serialization proxy, which IMO isn't appropriate.


---


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter yes, now that you mentioned it, the 
`isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the 
serialization proxy. Essentially, what it is only doing is serving as a switch 
to decide whether or not to fail - something that could be done by the caller.

I'll quickly try your suggested approach and see how that turns out.


---


[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...

2018-05-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5929#discussion_r185759080
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
 
try {
for (String topic : topics) {
-   for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
+   List topicPartitions = 
kafkaConsumer.partitionsFor(topic);
+   if (topicPartitions == null) {
+   throw new IllegalStateException("The 
topic " + topic + " does not exist");
--- End diff --

I think the `RuntimeException` in 
`AbstractPartitionDiscoverer#discoverPartitions` maybe needs to be revisited, 
also.
As far as I understand it, we should only fail the job if for the first 
discovery (for seed initial partitions that the connector consumes) is empty 
across all partitions. Otherwise, it should be ok that while the job runs, the 
partitions discover fails fetch partition meta info for some discovery attempt.


---


[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5939
  
@StephanEwen we have some follow-up discussion on how to approach this on 
the JIRA ticket. Do you want to take a look at that? 
https://issues.apache.org/jira/browse/FLINK-8500

The general idea is that we should perhaps come up with a more generic 
schema interface that allows providing all information that Kafka gives us and 
provide them to the user.


---


[GitHub] flink pull request #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE...

2018-05-03 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5929#discussion_r185732772
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09PartitionDiscoverer.java
 ---
@@ -74,7 +74,12 @@ protected void initializeConnections() {
 
try {
for (String topic : topics) {
-   for (PartitionInfo partitionInfo : 
kafkaConsumer.partitionsFor(topic)) {
+   List topicPartitions = 
kafkaConsumer.partitionsFor(topic);
+   if (topicPartitions == null) {
+   throw new IllegalStateException("The 
topic " + topic + " does not exist");
--- End diff --

I fear that this might be too aggressive.
IMO, it is fine that the user has, say 3 topics, but only one of them 
actually doesn't exist.

What we should handle is the case where there is completely no partitions 
at all across all provided topics.
Perhaps for this, we only write a log that some topic has no partitions?


---


[GitHub] flink pull request #5945: [FLINK-9169] [runtime] Allow KeyedBackendSerializa...

2018-05-03 Thread tzulitai
Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/5945


---


[GitHub] flink issue #5945: [FLINK-9169] [runtime] Allow KeyedBackendSerializationPro...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5945
  
I've opened a new PR #5950 which has a cleaner approach to this.
That new PR subsumes this one.


---


<    1   2   3   4   5   6   7   8   9   10   >