This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b44257  (chores) camel-kafka: fix test issues and enable more tests
1b44257 is described below

commit 1b442574c21948717db5ffbae6faf0d92f0ed80e
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Tue Sep 21 13:35:51 2021 +0200

    (chores) camel-kafka: fix test issues and enable more tests
    
    Enable the idempotent tests by default after fixing problems in a few of
    our tests
---
 .../component/kafka/MockConsumerInterceptor.java   |  6 +++++-
 .../kafka/integration/KafkaConsumerFullIT.java     | 23 ++++++++++++----------
 ...kaConsumerIdempotentWithCustomSerializerIT.java |  2 --
 .../KafkaConsumerIdempotentWithProcessorIT.java    |  6 ++----
 .../integration/KafkaConsumerManualCommitIT.java   | 15 --------------
 .../integration/KafkaConsumerTopicIsPatternIT.java |  3 ---
 6 files changed, 20 insertions(+), 35 deletions(-)

diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
index 109f388..4f63214 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java
@@ -23,13 +23,17 @@ import 
org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockConsumerInterceptor implements ConsumerInterceptor<String, 
String> {
-
     public static ArrayList<ConsumerRecords<String, String>> recordsCaptured = 
new ArrayList<>();
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MockConsumerInterceptor.class);
+
     @Override
     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, 
String> consumerRecords) {
+        consumerRecords.forEach(r -> LOG.trace("Captured on mock: {}", 
r.value()));
         recordsCaptured.add(consumerRecords);
         return consumerRecords;
     }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index f4a9dd4..daf1454 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -39,19 +39,19 @@ import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true",
-                          disabledReason = "Runtime conflicts with the 
idempotency tests")
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
+    public static final String TOPIC = "test-full";
 
-    public static final String TOPIC = "test";
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerFullIT.class);
 
     @BindToRegistry("myHeaderDeserializer")
     private MyKafkaHeaderDeserializer deserializer = new 
MyKafkaHeaderDeserializer();
@@ -71,6 +71,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
     public void before() {
         Properties props = getDefaultProperties();
         producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
     }
 
     @AfterEach
@@ -87,8 +88,9 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         return new RouteBuilder() {
 
             @Override
-            public void configure() throws Exception {
-                from(from).routeId("foo").to(to);
+            public void configure() {
+                from(from).process(exchange -> LOG.trace("Captured on the 
processor: {}", exchange.getMessage().getBody()))
+                        .routeId("full-it").to(to);
             }
         };
     }
@@ -116,6 +118,7 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
 
         to.assertIsSatisfied(3000);
 
+        Thread.sleep(1000);
         assertEquals(5, 
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
 false)
                 .count());
 
@@ -161,12 +164,12 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", 
"message-2", "message-3", "message-4");
 
         // Restart endpoint,
-        context.getRouteController().stopRoute("foo");
+        context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
         kafkaEndpoint.getConfiguration().setSeekTo("beginning");
 
-        context.getRouteController().startRoute("foo");
+        context.getRouteController().startRoute("full-it");
 
         // As wee set seek to beginning we should re-consume all messages
         to.assertIsSatisfied(3000);
@@ -189,12 +192,12 @@ public class KafkaConsumerFullIT extends 
BaseEmbeddedKafkaTestSupport {
         to.expectedMessageCount(0);
 
         // Restart endpoint,
-        context.getRouteController().stopRoute("foo");
+        context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from;
         kafkaEndpoint.getConfiguration().setSeekTo("end");
 
-        context.getRouteController().startRoute("foo");
+        context.getRouteController().startRoute("full-it");
 
         // As wee set seek to end we should not re-consume any messages
         synchronized (this) {
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index 5a37f0d..14a5d92 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -29,9 +29,7 @@ import 
org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
-@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true")
 public class KafkaConsumerIdempotentWithCustomSerializerIT extends 
KafkaConsumerIdempotentTestSupport {
 
     public static final String TOPIC = "idempt2";
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 4f0bd9e..c8c2e14 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -30,9 +30,7 @@ import 
org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
-@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true")
 public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempotentTestSupport {
     public static final String TOPIC = "testidemp3";
 
@@ -48,7 +46,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempot
                     + 
"&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
-    @EndpointInject("mock:result")
+    @EndpointInject("mock:resulti")
     private MockEndpoint to;
 
     private int size = 200;
@@ -72,7 +70,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends 
KafkaConsumerIdempot
 
             @Override
             public void configure() throws Exception {
-                from(from).routeId("foo")
+                from(from).routeId("idemp-with-prop")
                         .process(exchange -> {
                             byte[] id = exchange.getIn().getHeader("id", 
byte[].class);
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
index 798c508..6a88e3c 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java
@@ -107,9 +107,6 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
         // start a new route in order to rebalance kafka
         context.getRouteController().startRoute("bar");
         toBar.expectedMessageCount(1);
-        synchronized (this) {
-            Thread.sleep(1000);
-        }
 
         toBar.assertIsSatisfied();
 
@@ -120,11 +117,6 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
         to.expectedMessageCount(1);
         to.expectedBodiesReceivedInAnyOrder("message-1");
 
-        // give some time for the route to start again
-        synchronized (this) {
-            Thread.sleep(1000);
-        }
-
         to.assertIsSatisfied(3000);
     }
 
@@ -146,8 +138,6 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
 
         to.reset();
 
-        //        Thread.sleep(5000);
-
         // Second step: We shut down our route, we expect nothing will be 
recovered by our route
         context.getRouteController().stopRoute("foo");
         to.expectedMessageCount(0);
@@ -168,11 +158,6 @@ public class KafkaConsumerManualCommitIT extends 
BaseEmbeddedKafkaTestSupport {
         context.getRouteController().startRoute("foo");
         to.expectedMessageCount(3);
 
-        // give some time for the route to start again
-        synchronized (this) {
-            Thread.sleep(1000);
-        }
-
         to.assertIsSatisfied(3000);
     }
 
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
index af9c8ce..50281ed 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
@@ -30,12 +30,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", 
matches = "true",
-                          disabledReason = "Runtime conflicts with the 
idempotency tests")
 public class KafkaConsumerTopicIsPatternIT extends 
BaseEmbeddedKafkaTestSupport {
 
     public static final String TOPIC = "test";

Reply via email to