This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/main by this push: new b38f9a1 Made Kinesis tests hopefully more robust. b38f9a1 is described below commit b38f9a13f6031838bda8d952683b47af4b6f3ff4 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Sun May 16 00:02:36 2021 +0200 Made Kinesis tests hopefully more robust. --- .../aws/v2/kinesis/common/KinesisUtils.java | 32 +++++++++++++++++----- .../pubsub/sink/CamelSinkGooglePubSubITCase.java | 2 +- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java index de2e59c..2f86837 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java @@ -34,12 +34,14 @@ import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -76,13 +78,7 @@ public final class KinesisUtils { public static void createStream(KinesisClient kinesisClient, String streamName) { try { LOG.info("Checking whether the stream exists already"); - DescribeStreamRequest request = DescribeStreamRequest.builder() - .streamName(streamName) - .build(); - - DescribeStreamResponse response = kinesisClient.describeStream(request); - - int status = response.sdkHttpResponse().statusCode(); + int status = getStreamStatus(kinesisClient, streamName); LOG.info("Kinesis stream check result: {}", status); } catch (KinesisException e) { if (LOG.isTraceEnabled()) { @@ -92,9 +88,31 @@ public final class KinesisUtils { } doCreateStream(kinesisClient, streamName); + TestUtils.waitFor(() -> { + try { + GetRecordsRequest getRecordsRequest = KinesisUtils.getGetRecordsRequest(kinesisClient, streamName); + GetRecordsResponse response = kinesisClient.getRecords(getRecordsRequest); + List<Record> recordList = response.records(); + LOG.debug("Checking for stream creation by reading {} records: SUCCESS!", recordList.size()); + return true; + } catch (Exception exc) { + LOG.debug("Checking for stream creation by reading records: FAILURE, retrying.."); + return false; + } + }); } } + private static int getStreamStatus(KinesisClient kinesisClient, String streamName) { + DescribeStreamRequest request = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); + + DescribeStreamResponse response = kinesisClient.describeStream(request); + + return response.sdkHttpResponse().statusCode(); + } + public static void doDeleteStream(KinesisClient kinesisClient, String streamName) { DeleteStreamRequest request = DeleteStreamRequest.builder() .streamName(streamName) diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java index af99586..dc56875 100644 --- a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java +++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java @@ -92,7 +92,7 @@ public class CamelSinkGooglePubSubITCase extends CamelSinkTestSupport { protected void verifyMessages(CountDownLatch latch) throws InterruptedException { List<String> receivedMessages = easyClient.getReceivedMessages(); - if (latch.await(30, TimeUnit.SECONDS)) { + if (latch.await(40, TimeUnit.SECONDS)) { assertEquals(expected, receivedMessages.size(), "Did not receive as many messages as was sent"); } else { fail("Failed to receive the messages within the specified time");