This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 1b44257 (chores) camel-kafka: fix test issues and enable more tests 1b44257 is described below commit 1b442574c21948717db5ffbae6faf0d92f0ed80e Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Tue Sep 21 13:35:51 2021 +0200 (chores) camel-kafka: fix test issues and enable more tests Enable the idempotent tests by default after fixing problems in a few of our tests --- .../component/kafka/MockConsumerInterceptor.java | 6 +++++- .../kafka/integration/KafkaConsumerFullIT.java | 23 ++++++++++++---------- ...kaConsumerIdempotentWithCustomSerializerIT.java | 2 -- .../KafkaConsumerIdempotentWithProcessorIT.java | 6 ++---- .../integration/KafkaConsumerManualCommitIT.java | 15 -------------- .../integration/KafkaConsumerTopicIsPatternIT.java | 3 --- 6 files changed, 20 insertions(+), 35 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java index 109f388..4f63214 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/MockConsumerInterceptor.java @@ -23,13 +23,17 @@ import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MockConsumerInterceptor implements ConsumerInterceptor<String, String> { - public static ArrayList<ConsumerRecords<String, String>> recordsCaptured = new ArrayList<>(); + private static final Logger LOG = LoggerFactory.getLogger(MockConsumerInterceptor.class); + @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) { + consumerRecords.forEach(r -> LOG.trace("Captured on mock: {}", r.value())); recordsCaptured.add(consumerRecords); return consumerRecords; } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java index f4a9dd4..daf1454 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java @@ -39,19 +39,19 @@ import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.test.junit5.TestSupport.assertIsInstanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true", - disabledReason = "Runtime conflicts with the idempotency tests") @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { + public static final String TOPIC = "test-full"; - public static final String TOPIC = "test"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerFullIT.class); @BindToRegistry("myHeaderDeserializer") private MyKafkaHeaderDeserializer deserializer = new MyKafkaHeaderDeserializer(); @@ -71,6 +71,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { public void before() { Properties props = getDefaultProperties(); producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + MockConsumerInterceptor.recordsCaptured.clear(); } @AfterEach @@ -87,8 +88,9 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { return new RouteBuilder() { @Override - public void configure() throws Exception { - from(from).routeId("foo").to(to); + public void configure() { + from(from).process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + .routeId("full-it").to(to); } }; } @@ -116,6 +118,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.assertIsSatisfied(3000); + Thread.sleep(1000); assertEquals(5, StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(), false) .count()); @@ -161,12 +164,12 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); // Restart endpoint, - context.getRouteController().stopRoute("foo"); + context.getRouteController().stopRoute("full-it"); KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from; kafkaEndpoint.getConfiguration().setSeekTo("beginning"); - context.getRouteController().startRoute("foo"); + context.getRouteController().startRoute("full-it"); // As wee set seek to beginning we should re-consume all messages to.assertIsSatisfied(3000); @@ -189,12 +192,12 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { to.expectedMessageCount(0); // Restart endpoint, - context.getRouteController().stopRoute("foo"); + context.getRouteController().stopRoute("full-it"); KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) from; kafkaEndpoint.getConfiguration().setSeekTo("end"); - context.getRouteController().startRoute("foo"); + context.getRouteController().startRoute("full-it"); // As wee set seek to end we should not re-consume any messages synchronized (this) { diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java index 5a37f0d..14a5d92 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java @@ -29,9 +29,7 @@ import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true") public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumerIdempotentTestSupport { public static final String TOPIC = "idempt2"; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java index 4f0bd9e..c8c2e14 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java @@ -30,9 +30,7 @@ import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -@EnabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true") public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempotentTestSupport { public static final String TOPIC = "testidemp3"; @@ -48,7 +46,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; - @EndpointInject("mock:result") + @EndpointInject("mock:resulti") private MockEndpoint to; private int size = 200; @@ -72,7 +70,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot @Override public void configure() throws Exception { - from(from).routeId("foo") + from(from).routeId("idemp-with-prop") .process(exchange -> { byte[] id = exchange.getIn().getHeader("id", byte[].class); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java index 798c508..6a88e3c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerManualCommitIT.java @@ -107,9 +107,6 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport { // start a new route in order to rebalance kafka context.getRouteController().startRoute("bar"); toBar.expectedMessageCount(1); - synchronized (this) { - Thread.sleep(1000); - } toBar.assertIsSatisfied(); @@ -120,11 +117,6 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport { to.expectedMessageCount(1); to.expectedBodiesReceivedInAnyOrder("message-1"); - // give some time for the route to start again - synchronized (this) { - Thread.sleep(1000); - } - to.assertIsSatisfied(3000); } @@ -146,8 +138,6 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport { to.reset(); - // Thread.sleep(5000); - // Second step: We shut down our route, we expect nothing will be recovered by our route context.getRouteController().stopRoute("foo"); to.expectedMessageCount(0); @@ -168,11 +158,6 @@ public class KafkaConsumerManualCommitIT extends BaseEmbeddedKafkaTestSupport { context.getRouteController().startRoute("foo"); to.expectedMessageCount(3); - // give some time for the route to start again - synchronized (this) { - Thread.sleep(1000); - } - to.assertIsSatisfied(3000); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java index af9c8ce..50281ed 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java @@ -30,12 +30,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.junit.jupiter.api.Assertions.assertEquals; -@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests", matches = "true", - disabledReason = "Runtime conflicts with the idempotency tests") public class KafkaConsumerTopicIsPatternIT extends BaseEmbeddedKafkaTestSupport { public static final String TOPIC = "test";