[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354370#comment-16354370 ] Tzu-Li (Gordon) Tai commented on FLINK-8398: Merged. 1.4 - ca1e525c5c2ddb2b36fcdd7fe8d2e7b053a063cf 1.5 - 3f0d6c618302e4b341e5a442126e6ba2889cd2f4 > Stabilize flaky KinesisDataFetcherTests > --- > > Key: FLINK-8398 > URL: https://issues.apache.org/jira/browse/FLINK-8398 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0, 1.4.1 > > > The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. > They rely on on thread sleeps to wait for a certain operation to happen, > which can easily miss and cause tests to fail. > Although there isn't any reports of consistent failures on these tests yet > (as far as I am aware of), they can easily surface in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354355#comment-16354355 ] ASF GitHub Bot commented on FLINK-8398: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5268 > Stabilize flaky KinesisDataFetcherTests > --- > > Key: FLINK-8398 > URL: https://issues.apache.org/jira/browse/FLINK-8398 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0, 1.4.1 > > > The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. > They rely on on thread sleeps to wait for a certain operation to happen, > which can easily miss and cause tests to fail. > Although there isn't any reports of consistent failures on these tests yet > (as far as I am aware of), they can easily surface in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346738#comment-16346738 ] ASF GitHub Bot commented on FLINK-8398: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5268 Thanks for the review @zentol. Will rebase, address your comment, and then merge. > Stabilize flaky KinesisDataFetcherTests > --- > > Key: FLINK-8398 > URL: https://issues.apache.org/jira/browse/FLINK-8398 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.5.0, 1.4.1 > > > The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. > They rely on on thread sleeps to wait for a certain operation to happen, > which can easily miss and cause tests to fail. > Although there isn't any reports of consistent failures on these tests yet > (as far as I am aware of), they can easily surface in the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343235#comment-16343235 ] ASF GitHub Bot commented on FLINK-8398: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5268#discussion_r164401125 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java --- @@ -18,123 +18,110 @@ package org.apache.flink.streaming.connectors.kinesis.testutils; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; -import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + /** * Extension of the {@link KinesisDataFetcher} for testing. */ -public class TestableKinesisDataFetcher extends KinesisDataFetcher { - - private static final Object fakeCheckpointLock = new Object(); - - private long numElementsCollected; +public class TestableKinesisDataFetcher extends KinesisDataFetcher { private OneShotLatch runWaiter; + private OneShotLatch initialDiscoveryWaiter; + + private volatile boolean running; public TestableKinesisDataFetcher( List fakeStreams, + SourceFunction.SourceContext sourceContext, Properties fakeConfiguration, + KinesisDeserializationSchema deserializationSchema, int fakeTotalCountOfSubtasks, int fakeIndexOfThisSubtask, AtomicReference thrownErrorUnderTest, LinkedList subscribedShardsStateUnderTest, HashMap subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, KinesisProxyInterface fakeKinesis) { - super(fakeStreams, - getMockedSourceContext(), - fakeCheckpointLock, + super( + fakeStreams, + sourceContext, + sourceContext.getCheckpointLock(), getMockedRuntimeContext(fakeTotalCountOfSubtasks, fakeIndexOfThisSubtask), fakeConfiguration, - new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + deserializationSchema, thrownErrorUnderTest, subscribedShardsStateUnderTest, subscribedStreamsToLastDiscoveredShardIdsStateUnderTest, fakeKinesis); - this.numElementsCollected = 0; this.runWaiter = new OneShotLatch(); - } + this.initialDiscoveryWaiter = new OneShotLatch(); - public long getNumOfElementsCollected() { - return numElementsCollected; + this.running = true; } @Override - protected KinesisDeserializationSchema getClonedDeserializationSchema() { - return new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()); + public void runFetcher() throws Exception { + runWaiter.trigger(); + super.runFetcher(); + } + + public void waitUntilRun() throws Exception { + runWaiter.await(); } @Override - protected void emitRecordAndUpdateState(String record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { - synchronized (fakeCheckpoi
[jira] [Commented] (FLINK-8398) Stabilize flaky KinesisDataFetcherTests
[ https://issues.apache.org/jira/browse/FLINK-8398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319741#comment-16319741 ] ASF GitHub Bot commented on FLINK-8398: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5268 [FLINK-8398] [kinesis, tests] Harden KinesisDataFetcherTest unit tests ## What is the purpose of the change Prior to this PR, many of the `KinesisDataFetcherTest` unit tests relied on thread sleeps to wait until a certain operation occurs to allow the test to pass. This test behaviour is very flaky, and should be replaced with `OneShotLatch`. ## Brief change log - 94b4591: Several minor cleanups of confusing implementations / code smells in the `KinesisDataFetcherTest` and related test classes. The commit message explains what exactly was changed. - 547d19f: Remove thread sleeps in unit tests, and replace them with `OneShotLatch`. ## Verifying this change No test coverage should have been affected by this change. The existing tests in `KinesisDataFetcherTest` verifies this. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / *no*) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / *no*) - The serializers: (yes / *no* / don't know) - The runtime per-record code paths (performance sensitive): (yes / *no* / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / *no* / don't know) - The S3 file system connector: (yes / *no* / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / *no*) - If yes, how is the feature documented? (*not applicable* / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8398 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5268 commit 94b45919afa5a3ec3ce68c45e57f7989397f9640 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T02:11:31Z [FLINK-8398] [kinesis, tests] Cleanup confusing implementations in KinesisDataFetcherTest and related classes The previous implementation of the TestableKinesisDataFetcher was confusing in various ways, causing it hard to be re-used for other tests. This commit contains the following various cleaups: - Remove confusing mocks of source context and checkpoint lock. We now allow users of the TestableKinesisDataFetcher to provide a source context, which should provide the checkpoint lock. - Remove override of emitRecordAndUpdateState(). Strictly speaking, that method should be final. It was previously overriden to allow verifying how many records were output by the fetcher. That verification would be better implemented within a mock source context. - Properly parameterize the output type for the TestableKinesisDataFetcher. - Remove use of PowerMockito in KinesisDataFetcherTest. - Use CheckedThreads to properly capture any exceptions in fetcher / consumer threads in unit tests. - Use assertEquals / assertNull instead of assertTrue where-ever appropriate. commit 547d19f9196512231661f427f3792f2e1f831339 Author: Tzu-Li (Gordon) Tai Date: 2018-01-10T05:41:49Z [FLINK-8398] [kinesis, tests] Stabilize flaky KinesisDataFetcherTests Prior to this commit, several unit tests in KinesisDataFetcherTest relied on sleeps to wait until a certain operation happens, in order for the test to pass. This commit removes those sleeps and replaces the test behaviours with OneShotLatches. > Stabilize flaky KinesisDataFetcherTests > --- > > Key: FLINK-8398 > URL: https://issues.apache.org/jira/browse/FLINK-8398 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.5.0, 1.4.1 > > > The unit tests in {{KinesisDataFetcherTest}} have very flaky implementations. > They rely on on thread sleeps to wait for a certain operation to happen, > which can easily miss and cause tests to fail. > Although there isn't any reports of consistent failures on these tests yet > (as far as I am aware of), they can easily surface in the future. -- This