[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354370#comment-16354370
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8398:


Merged.

1.4 - ca1e525c5c2ddb2b36fcdd7fe8d2e7b053a063cf
1.5 - 3f0d6c618302e4b341e5a442126e6ba2889cd2f4

> Stabilize flaky KinesisDataFetcherTests
> ---
>
> Key: FLINK-8398
> URL: https://issues.apache.org/jira/browse/FLINK-8398
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
> They rely on on thread sleeps to wait for a certain operation to happen, 
> which can easily miss and cause tests to fail.
> Although there isn't any reports of consistent failures on these tests yet 
> (as far as I am aware of),  they can easily surface in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354355#comment-16354355
 ] 

ASF GitHub Bot commented on FLINK-8398:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5268


> Stabilize flaky KinesisDataFetcherTests
> ---
>
> Key: FLINK-8398
> URL: https://issues.apache.org/jira/browse/FLINK-8398
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
> They rely on on thread sleeps to wait for a certain operation to happen, 
> which can easily miss and cause tests to fail.
> Although there isn't any reports of consistent failures on these tests yet 
> (as far as I am aware of),  they can easily surface in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346738#comment-16346738
 ] 

ASF GitHub Bot commented on FLINK-8398:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5268
  
Thanks for the review @zentol.
Will rebase, address your comment, and then merge.


> Stabilize flaky KinesisDataFetcherTests
> ---
>
> Key: FLINK-8398
> URL: https://issues.apache.org/jira/browse/FLINK-8398
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>
> The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
> They rely on on thread sleeps to wait for a certain operation to happen, 
> which can easily miss and cause tests to fail.
> Although there isn't any reports of consistent failures on these tests yet 
> (as far as I am aware of),  they can easily surface in the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-01-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343235#comment-16343235
 ] 

ASF GitHub Bot commented on FLINK-8398:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5268#discussion_r164401125
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 ---
@@ -18,123 +18,110 @@
 package org.apache.flink.streaming.connectors.kinesis.testutils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
 
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Extension of the {@link KinesisDataFetcher} for testing.
  */
-public class TestableKinesisDataFetcher extends KinesisDataFetcher 
{
-
-   private static final Object fakeCheckpointLock = new Object();
-
-   private long numElementsCollected;
+public class TestableKinesisDataFetcher extends KinesisDataFetcher {
 
private OneShotLatch runWaiter;
+   private OneShotLatch initialDiscoveryWaiter;
+
+   private volatile boolean running;
 
public TestableKinesisDataFetcher(
List fakeStreams,
+   SourceFunction.SourceContext sourceContext,
Properties fakeConfiguration,
+   KinesisDeserializationSchema deserializationSchema,
int fakeTotalCountOfSubtasks,
int fakeIndexOfThisSubtask,
AtomicReference thrownErrorUnderTest,
LinkedList 
subscribedShardsStateUnderTest,
HashMap 
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
KinesisProxyInterface fakeKinesis) {
-   super(fakeStreams,
-   getMockedSourceContext(),
-   fakeCheckpointLock,
+   super(
+   fakeStreams,
+   sourceContext,
+   sourceContext.getCheckpointLock(),
getMockedRuntimeContext(fakeTotalCountOfSubtasks, 
fakeIndexOfThisSubtask),
fakeConfiguration,
-   new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
+   deserializationSchema,
thrownErrorUnderTest,
subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
fakeKinesis);
 
-   this.numElementsCollected = 0;
this.runWaiter = new OneShotLatch();
-   }
+   this.initialDiscoveryWaiter = new OneShotLatch();
 
-   public long getNumOfElementsCollected() {
-   return numElementsCollected;
+   this.running = true;
}
 
@Override
-   protected KinesisDeserializationSchema 
getClonedDeserializationSchema() {
-   return new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
+   public void runFetcher() throws Exception {
+   runWaiter.trigger();
+   super.runFetcher();
+   }
+
+   public void waitUntilRun() throws Exception {
+   runWaiter.await();
}
 
@Override
-   protected void emitRecordAndUpdateState(String record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
-   synchronized (fakeCheckpoi

[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319741#comment-16319741
 ] 

ASF GitHub Bot commented on FLINK-8398:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5268

[FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests

## What is the purpose of the change

Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on 
thread sleeps to wait until a certain operation occurs to allow the test to 
pass. This test behaviour is very flaky, and should be replaced with 
`OneShotLatch`.

## Brief change log

- 94b4591: Several minor cleanups of confusing implementations / code 
smells in the `KinesisDataFetcherTest` and related test classes. The commit 
message explains what exactly was changed.
- 547d19f: Remove thread sleeps in unit tests, and replace them with 
`OneShotLatch`.


## Verifying this change

No test coverage should have been affected by this change.
The existing tests in `KinesisDataFetcherTest` verifies this.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / *no*)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / *no*)
  - The serializers: (yes / *no* / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / *no* 
/ don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
  - The S3 file system connector: (yes / *no* / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / *no*)
  - If yes, how is the feature documented? (*not applicable* / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5268


commit 94b45919afa5a3ec3ce68c45e57f7989397f9640
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T02:11:31Z

[FLINK-8398] [kinesis, tests] Cleanup confusing implementations in 
KinesisDataFetcherTest and related classes

The previous implementation of the TestableKinesisDataFetcher was
confusing in various ways, causing it hard to be re-used for other
tests. This commit contains the following various cleaups:

- Remove confusing mocks of source context and checkpoint lock. We now
  allow users of the TestableKinesisDataFetcher to provide a source
  context, which should provide the checkpoint lock.
- Remove override of emitRecordAndUpdateState(). Strictly speaking, that
  method should be final. It was previously overriden to allow
  verifying how many records were output by the fetcher. That
  verification would be better implemented within a mock source context.
- Properly parameterize the output type for the
  TestableKinesisDataFetcher.
- Remove use of PowerMockito in KinesisDataFetcherTest.
- Use CheckedThreads to properly capture any exceptions in fetcher /
  consumer threads in unit tests.
- Use assertEquals / assertNull instead of assertTrue where-ever
  appropriate.

commit 547d19f9196512231661f427f3792f2e1f831339
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-10T05:41:49Z

[FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests

Prior to this commit, several unit tests in KinesisDataFetcherTest
relied on sleeps to wait until a certain operation happens, in order for
the test to pass.

This commit removes those sleeps and replaces the test behaviours with
OneShotLatches.




> Stabilize flaky KinesisDataFetcherTests
> ---
>
> Key: FLINK-8398
> URL: https://issues.apache.org/jira/browse/FLINK-8398
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0, 1.4.1
>
>
> The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. 
> They rely on on thread sleeps to wait for a certain operation to happen, 
> which can easily miss and cause tests to fail.
> Although there isn't any reports of consistent failures on these tests yet 
> (as far as I am aware of),  they can easily surface in the future.



--
This