This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/main by this push:
     new b38f9a1  Made Kinesis tests hopefully more robust.
b38f9a1 is described below

commit b38f9a13f6031838bda8d952683b47af4b6f3ff4
Author: Andrea Tarocchi <andrea.taroc...@gmail.com>
AuthorDate: Sun May 16 00:02:36 2021 +0200

    Made Kinesis tests hopefully more robust.
---
 .../aws/v2/kinesis/common/KinesisUtils.java        | 32 +++++++++++++++++-----
 .../pubsub/sink/CamelSinkGooglePubSubITCase.java   |  2 +-
 2 files changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
index de2e59c..2f86837 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
@@ -34,12 +34,14 @@ import 
software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.Shard;
@@ -76,13 +78,7 @@ public final class KinesisUtils {
     public static void createStream(KinesisClient kinesisClient, String 
streamName) {
         try {
             LOG.info("Checking whether the stream exists already");
-            DescribeStreamRequest request = DescribeStreamRequest.builder()
-                    .streamName(streamName)
-                    .build();
-
-            DescribeStreamResponse response = 
kinesisClient.describeStream(request);
-
-            int status = response.sdkHttpResponse().statusCode();
+            int status = getStreamStatus(kinesisClient, streamName);
             LOG.info("Kinesis stream check result: {}", status);
         } catch (KinesisException e) {
             if (LOG.isTraceEnabled()) {
@@ -92,9 +88,31 @@ public final class KinesisUtils {
             }
 
             doCreateStream(kinesisClient, streamName);
+            TestUtils.waitFor(() -> {
+                try {
+                    GetRecordsRequest getRecordsRequest = 
KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);
+                    GetRecordsResponse response = 
kinesisClient.getRecords(getRecordsRequest);
+                    List<Record> recordList = response.records();
+                    LOG.debug("Checking for stream creation by reading {} 
records: SUCCESS!", recordList.size());
+                    return true;
+                } catch (Exception exc) {
+                    LOG.debug("Checking for stream creation by reading 
records: FAILURE, retrying..");
+                    return false;
+                }
+            });
         }
     }
 
+    private static int getStreamStatus(KinesisClient kinesisClient, String 
streamName) {
+        DescribeStreamRequest request = DescribeStreamRequest.builder()
+                .streamName(streamName)
+                .build();
+
+        DescribeStreamResponse response = 
kinesisClient.describeStream(request);
+
+        return response.sdkHttpResponse().statusCode();
+    }
+
     public static void doDeleteStream(KinesisClient kinesisClient, String 
streamName) {
         DeleteStreamRequest request = DeleteStreamRequest.builder()
                 .streamName(streamName)
diff --git 
a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
 
b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
index af99586..dc56875 100644
--- 
a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
+++ 
b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java
@@ -92,7 +92,7 @@ public class CamelSinkGooglePubSubITCase extends 
CamelSinkTestSupport {
     protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
         List<String> receivedMessages = easyClient.getReceivedMessages();
 
-        if (latch.await(30, TimeUnit.SECONDS)) {
+        if (latch.await(40, TimeUnit.SECONDS)) {
             assertEquals(expected, receivedMessages.size(), "Did not receive 
as many messages as was sent");
         } else {
             fail("Failed to receive the messages within the specified time");

Reply via email to