http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index a85563d..6fd9053 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -16,104 +16,36 @@ */ package org.apache.nifi.processors.kafka.pubsub; -import java.nio.charset.StandardCharsets; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.UUID; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.junit.Before; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class ConsumeKafkaTest { - static class MockConsumerPool extends ConsumerPool { - - final int actualMaxLeases; - final List<String> actualTopics; - final Map<String, String> actualKafkaProperties; - boolean throwKafkaExceptionOnPoll = false; - boolean throwKafkaExceptionOnCommit = false; - Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>(); - Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null; - Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null; - boolean wasConsumerLeasePoisoned = false; - boolean wasConsumerLeaseClosed = false; - boolean wasPoolClosed = false; - - public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) { - super(maxLeases, topics, kafkaProperties, null); - actualMaxLeases = maxLeases; - actualTopics = topics; - actualKafkaProperties = kafkaProperties; - } - - @Override - public ConsumerLease obtainConsumer() { - return new ConsumerLease() { - @Override - public ConsumerRecords<byte[], byte[]> poll() { - if (throwKafkaExceptionOnPoll) { - throw new KafkaException("i planned to fail"); - } - final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll(); - return (records == null) ? ConsumerRecords.empty() : records; - } - - @Override - public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) { - if (throwKafkaExceptionOnCommit) { - throw new KafkaException("i planned to fail"); - } - actualCommitOffsets = offsets; - } - - @Override - public void poison() { - wasConsumerLeasePoisoned = true; - } - - @Override - public void close() { - wasConsumerLeaseClosed = true; - } - }; - } - - @Override - public void close() { - wasPoolClosed = true; - } - - void resetState() { - throwKafkaExceptionOnPoll = false; - throwKafkaExceptionOnCommit = false; - nextPlannedRecordsQueue = null; - nextExpectedCommitOffsets = null; - wasConsumerLeasePoisoned = false; - wasConsumerLeaseClosed = false; - wasPoolClosed = false; - } + Consumer<byte[], byte[]> mockConsumer = null; + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + @Before + public void setup() { + mockConsumer = mock(Consumer.class); + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); } @Test @@ -174,31 +106,14 @@ public class ConsumeKafkaTest { public void validateGetAllMessages() throws Exception { String groupName = "validateGetAllMessages"; - final byte[][] firstPassValues = new byte[][]{ - "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hello-2".getBytes(StandardCharsets.UTF_8), - "Hello-3".getBytes(StandardCharsets.UTF_8) - }; - final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - - final byte[][] secondPassValues = new byte[][]{ - "Hello-4".getBytes(StandardCharsets.UTF_8), - "Hello-5".getBytes(StandardCharsets.UTF_8), - "Hello-6".getBytes(StandardCharsets.UTF_8) - }; - final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - expectedTopics.add("bar"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); - mockPool.nextPlannedRecordsQueue.add(firstRecs); - mockPool.nextPlannedRecordsQueue.add(secondRecs); + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; } }; final TestRunner runner = TestRunners.newTestRunner(proc); @@ -207,69 +122,29 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(expectedTopics, mockPool.actualTopics); - - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); - - if (mockPool.nextPlannedRecordsQueue.isEmpty()) { - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count()); - assertEquals(2, mockPool.actualCommitOffsets.size()); - assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); - assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset()); - } else { - assertEquals(2, mockPool.actualCommitOffsets.size()); - assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); - } - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertFalse(mockPool.wasPoolClosed); - runner.run(1, true); - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); } @Test - public void validateGetLotsOfMessages() throws Exception { - String groupName = "validateGetLotsOfMessages"; - - final byte[][] firstPassValues = new byte[10010][1]; - for (final byte[] value : firstPassValues) { - value[0] = 0x12; - } - final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; - final byte[][] secondPassValues = new byte[][]{ - "Hello-4".getBytes(StandardCharsets.UTF_8), - "Hello-5".getBytes(StandardCharsets.UTF_8), - "Hello-6".getBytes(StandardCharsets.UTF_8) - }; - final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - expectedTopics.add("bar"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); - mockPool.nextPlannedRecordsQueue.add(firstRecs); - mockPool.nextPlannedRecordsQueue.add(secondRecs); + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; } }; final TestRunner runner = TestRunners.newTestRunner(proc); @@ -278,352 +153,15 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count()); - assertEquals(1, mockPool.nextPlannedRecordsQueue.size()); - - assertEquals(1, mockPool.actualCommitOffsets.size()); - assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertFalse(mockPool.wasPoolClosed); - runner.run(1, true); - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); } - @SuppressWarnings({"rawtypes", "unchecked"}) - private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); - final TopicPartition tPart = new TopicPartition(topic, partition); - final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); - long offset = startingOffset; - for (final byte[] rawRecord : rawRecords) { - final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord); - records.add(rec); - } - map.put(tPart, records); - return new ConsumerRecords(map); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) { - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); - final TopicPartition tPart = new TopicPartition(topic, partition); - final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); - long offset = startingOffset; - - for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) { - final byte[] key = entry.getKey(); - final byte[] rawRecord = entry.getValue(); - final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord); - records.add(rec); - } - map.put(tPart, records); - return new ConsumerRecords(map); - } - - private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) { - final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); - for (final ConsumerRecords<byte[], byte[]> rec : records) { - rec.partitions().stream().forEach((part) -> { - final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part); - if (map.get(part) != null) { - throw new IllegalStateException("already have that topic/partition in the record map"); - } - map.put(part, conRecs); - }); - } - return new ConsumerRecords<>(map); - } - - @Test - public void validateGetAllMessagesWithProvidedDemarcator() throws Exception { - String groupName = "validateGetAllMessagesWithProvidedDemarcator"; - - final byte[][] firstPassValues = new byte[][]{ - "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hello-2".getBytes(StandardCharsets.UTF_8), - "Hello-3".getBytes(StandardCharsets.UTF_8) - }; - - final byte[][] secondPassValues = new byte[][]{ - "Hello-4".getBytes(StandardCharsets.UTF_8), - "Hello-5".getBytes(StandardCharsets.UTF_8), - "Hello-6".getBytes(StandardCharsets.UTF_8) - }; - final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( - createConsumerRecords("foo", 1, 1L, firstPassValues), - createConsumerRecords("bar", 1, 1L, secondPassValues) - ); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - expectedTopics.add("bar"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); - mockPool.nextPlannedRecordsQueue.add(consumerRecs); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; - } - }; - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); - - runner.run(1, false); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(2, flowFiles.size()); - - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count()); - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertFalse(mockPool.wasPoolClosed); - runner.run(1, true); - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - - assertEquals(2, mockPool.actualCommitOffsets.size()); - assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset()); - assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset()); - } - - @Test - public void validatePollException() throws Exception { - String groupName = "validatePollException"; - - final byte[][] firstPassValues = new byte[][]{ - "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hello-2".getBytes(StandardCharsets.UTF_8), - "Hello-3".getBytes(StandardCharsets.UTF_8) - }; - - final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( - createConsumerRecords("foo", 1, 1L, firstPassValues) - ); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); - mockPool.nextPlannedRecordsQueue.add(consumerRecs); - mockPool.throwKafkaExceptionOnPoll = true; - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; - } - }; - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); - - runner.run(1, true); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(0, flowFiles.size()); - assertNull(null, mockPool.actualCommitOffsets); - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertTrue(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - } - - @Test - public void validateCommitOffsetException() throws Exception { - String groupName = "validateCommitOffsetException"; - - final byte[][] firstPassValues = new byte[][]{ - "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hello-2".getBytes(StandardCharsets.UTF_8), - "Hello-3".getBytes(StandardCharsets.UTF_8) - }; - - final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords( - createConsumerRecords("foo", 1, 1L, firstPassValues) - ); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null); - mockPool.nextPlannedRecordsQueue.add(consumerRecs); - mockPool.throwKafkaExceptionOnCommit = true; - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; - } - }; - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah"); - - runner.run(1, true); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(1, flowFiles.size()); - - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count()); - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertTrue(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - - assertNull(null, mockPool.actualCommitOffsets); - } - - @Test - public void validateUtf8Key() { - String groupName = "validateGetAllMessages"; - - final Map<byte[], byte[]> rawRecords = new HashMap<>(); - rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); - rawRecords.put(new byte[0], "Hello-2".getBytes()); - rawRecords.put(null, "Hello-3".getBytes()); - - final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - expectedTopics.add("bar"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); - mockPool.nextPlannedRecordsQueue.add(firstRecs); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - - runner.run(1, false); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(expectedTopics, mockPool.actualTopics); - - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); - - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count()); - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); - - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertFalse(mockPool.wasPoolClosed); - runner.run(1, true); - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - } - - @Test - public void validateHexKey() { - String groupName = "validateGetAllMessages"; - - final Map<byte[], byte[]> rawRecords = new HashMap<>(); - rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); - rawRecords.put(new byte[0], "Hello-2".getBytes()); - rawRecords.put(null, "Hello-3".getBytes()); - - final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); - - final List<String> expectedTopics = new ArrayList<>(); - expectedTopics.add("foo"); - expectedTopics.add("bar"); - final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); - mockPool.nextPlannedRecordsQueue.add(firstRecs); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) { - return mockPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setValidateExpressionUsage(false); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING); - - runner.run(1, false); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); - - assertEquals(expectedTopics, mockPool.actualTopics); - - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); - assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); - - final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase(); - - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count()); - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); - assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); - - - //asert that all consumers were closed as expected - //assert that the consumer pool was properly closed - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertFalse(mockPool.wasPoolClosed); - runner.run(1, true); - assertFalse(mockPool.wasConsumerLeasePoisoned); - assertTrue(mockPool.wasConsumerLeaseClosed); - assertTrue(mockPool.wasPoolClosed); - } }
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 7f88ea2..0ebf2b3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -16,109 +16,203 @@ */ package org.apache.nifi.processors.kafka.pubsub; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; -import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ConsumerPoolTest { Consumer<byte[], byte[]> consumer = null; + ProcessSession mockSession = null; + ProvenanceReporter mockReporter = null; + ConsumerPool testPool = null; + ConsumerPool testDemarcatedPool = null; ComponentLog logger = null; @Before public void setup() { consumer = mock(Consumer.class); logger = mock(ComponentLog.class); - } - - @Test - public void validatePoolSimpleCreateClose() throws Exception { - - final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { + mockSession = mock(ProcessSession.class); + mockReporter = mock(ProvenanceReporter.class); + when(mockSession.getProvenanceReporter()).thenReturn(mockReporter); + testPool = new ConsumerPool( + 1, + null, + Collections.emptyMap(), + Collections.singletonList("nifi"), + 100L, + "utf-8", + "ssl", + "localhost", + logger) { @Override protected Consumer<byte[], byte[]> createKafkaConsumer() { return consumer; } }; + testDemarcatedPool = new ConsumerPool( + 1, + "--demarcator--".getBytes(StandardCharsets.UTF_8), + Collections.emptyMap(), + Collections.singletonList("nifi"), + 100L, + "utf-8", + "ssl", + "localhost", + logger) { + @Override + protected Consumer<byte[], byte[]> createKafkaConsumer() { + return consumer; + } + }; + } - when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty()); + @Test + public void validatePoolSimpleCreateClose() throws Exception { - try (final ConsumerLease lease = testPool.obtainConsumer()) { + when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + lease.poll(); + } + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { lease.poll(); - lease.commitOffsets(Collections.emptyMap()); } testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); final PoolStats stats = testPool.getPoolStats(); assertEquals(1, stats.consumerCreatedCount); assertEquals(1, stats.consumerClosedCount); - assertEquals(1, stats.leasesObtainedCount); - assertEquals(1, stats.unproductivePollCount); - assertEquals(0, stats.productivePollCount); + assertEquals(4, stats.leasesObtainedCount); } @Test - public void validatePoolSimpleBatchCreateClose() throws Exception { - - final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { - @Override - protected Consumer<byte[], byte[]> createKafkaConsumer() { - return consumer; - } + public void validatePoolSimpleCreatePollClose() throws Exception { + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) }; + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty()); + when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + lease.poll(); + lease.commit(); + } + testPool.close(); + verify(mockSession, times(3)).create(); + verify(mockSession, times(1)).commit(); + final PoolStats stats = testPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + } + @Test + public void validatePoolSimpleBatchCreateClose() throws Exception { + when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); for (int i = 0; i < 100; i++) { - try (final ConsumerLease lease = testPool.obtainConsumer()) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { for (int j = 0; j < 100; j++) { lease.poll(); } - lease.commitOffsets(Collections.emptyMap()); } } testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); final PoolStats stats = testPool.getPoolStats(); assertEquals(1, stats.consumerCreatedCount); assertEquals(1, stats.consumerClosedCount); assertEquals(100, stats.leasesObtainedCount); - assertEquals(10000, stats.unproductivePollCount); - assertEquals(0, stats.productivePollCount); } @Test - public void validatePoolConsumerFails() throws Exception { - - final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) { - @Override - protected Consumer<byte[], byte[]> createKafkaConsumer() { - return consumer; - } + public void validatePoolBatchCreatePollClose() throws Exception { + final byte[][] firstPassValues = new byte[][]{ + "Hello-1".getBytes(StandardCharsets.UTF_8), + "Hello-2".getBytes(StandardCharsets.UTF_8), + "Hello-3".getBytes(StandardCharsets.UTF_8) }; + final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); - when(consumer.poll(anyInt())).thenThrow(new KafkaException()); - - try (final ConsumerLease lease = testPool.obtainConsumer()) { + when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); + try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) { lease.poll(); - fail(); - } catch (final KafkaException ke) { + lease.commit(); + } + testDemarcatedPool.close(); + verify(mockSession, times(1)).create(); + verify(mockSession, times(1)).commit(); + final PoolStats stats = testDemarcatedPool.getPoolStats(); + assertEquals(1, stats.consumerCreatedCount); + assertEquals(1, stats.consumerClosedCount); + assertEquals(1, stats.leasesObtainedCount); + } + + @Test + public void validatePoolConsumerFails() throws Exception { + + when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops")); + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try { + lease.poll(); + fail(); + } catch (final KafkaException ke) { + } } testPool.close(); + verify(mockSession, times(0)).create(); + verify(mockSession, times(0)).commit(); final PoolStats stats = testPool.getPoolStats(); assertEquals(1, stats.consumerCreatedCount); assertEquals(1, stats.consumerClosedCount); assertEquals(1, stats.leasesObtainedCount); - assertEquals(0, stats.unproductivePollCount); - assertEquals(0, stats.productivePollCount); } + + @SuppressWarnings({"rawtypes", "unchecked"}) + static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); + final TopicPartition tPart = new TopicPartition(topic, partition); + final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); + long offset = startingOffset; + for (final byte[] rawRecord : rawRecords) { + final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord); + records.add(rec); + } + map.put(tPart, records); + return new ConsumerRecords(map); + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java deleted file mode 100644 index 19c64af..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.kafka.clients.producer.Partitioner; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; -import org.apache.nifi.processors.kafka.test.EmbeddedKafka; -import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import org.apache.kafka.clients.producer.ProducerConfig; - -public class KafkaPublisherTest { - - private static EmbeddedKafka kafkaLocal; - - private static EmbeddedKafkaProducerHelper producerHelper; - - @BeforeClass - public static void beforeClass() { - kafkaLocal = new EmbeddedKafka(); - kafkaLocal.start(); - producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); - } - - @AfterClass - public static void afterClass() throws Exception { - producerHelper.close(); - kafkaLocal.stop(); - } - - @Test - public void validateSuccessfulSendAsWhole() throws Exception { - InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); - String topicName = "validateSuccessfulSendAsWhole"; - - Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - KafkaPublisherResult result = publisher.publish(publishingContext); - - assertEquals(0, result.getLastMessageAcked()); - assertEquals(1, result.getMessagesSent()); - contentStream.close(); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - assertNotNull(iter.next()); - try { - iter.next(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - } - - @Test - public void validateSuccessfulSendAsDelimited() throws Exception { - InputStream contentStream = new ByteArrayInputStream( - "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8)); - String topicName = "validateSuccessfulSendAsDelimited"; - - Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - KafkaPublisherResult result = publisher.publish(publishingContext); - - assertEquals(3, result.getLastMessageAcked()); - assertEquals(4, result.getMessagesSent()); - contentStream.close(); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - } - - /* - * This test simulates the condition where not all messages were ACKed by - * Kafka - */ - @Test - public void validateRetries() throws Exception { - byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); - InputStream contentStream = new ByteArrayInputStream(testValue); - String topicName = "validateSuccessfulReSendOfFailedSegments"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - // simulates the first re-try - int lastAckedMessageIndex = 1; - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - - publisher.publish(publishingContext); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - String m1 = new String(iter.next().message()); - String m2 = new String(iter.next().message()); - assertEquals("Hello Kafka3", m1); - assertEquals("Hello Kafka4", m2); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - // simulates the second re-try - lastAckedMessageIndex = 2; - contentStream = new ByteArrayInputStream(testValue); - publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(publishingContext); - - m1 = new String(iter.next().message()); - assertEquals("Hello Kafka4", m1); - - publisher.close(); - } - - /* - * Similar to the above test, but it sets the first retry index to the last - * possible message index and second index to an out of bound index. The - * expectation is that no messages will be sent to Kafka - */ - @Test - public void validateRetriesWithWrongIndex() throws Exception { - byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); - InputStream contentStream = new ByteArrayInputStream(testValue); - String topicName = "validateRetriesWithWrongIndex"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - // simulates the first re-try - int lastAckedMessageIndex = 3; - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - - publisher.publish(publishingContext); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - // simulates the second re-try - lastAckedMessageIndex = 6; - contentStream = new ByteArrayInputStream(testValue); - publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(publishingContext); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - publisher.close(); - } - - @Test - public void validateWithMultiByteCharactersNoDelimiter() throws Exception { - String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; - InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); - String topicName = "validateWithMultiByteCharacters"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - - publisher.publish(publishingContext); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - String r = new String(iter.next().message(), StandardCharsets.UTF_8); - assertEquals(data, r); - } - - @Test - public void validateWithNonDefaultPartitioner() throws Exception { - String data = "fooandbarandbaz"; - InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); - String topicName = "validateWithNonDefaultPartitioner"; - - Properties kafkaProperties = this.buildProducerProperties(); - kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); - - try { - publisher.publish(publishingContext); - // partitioner should be invoked 3 times - assertTrue(TestPartitioner.counter == 3); - publisher.close(); - } finally { - TestPartitioner.counter = 0; - } - } - - private Properties buildProducerProperties() { - Properties kafkaProperties = new Properties(); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort()); - kafkaProperties.put("auto.create.topics.enable", "true"); - return kafkaProperties; - } - - private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { - Properties props = new Properties(); - props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); - props.put("group.id", "test"); - props.put("consumer.timeout.ms", "500"); - props.put("auto.offset.reset", "smallest"); - ConsumerConfig consumerConfig = new ConsumerConfig(props); - ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); - Map<String, Integer> topicCountMap = new HashMap<>(1); - topicCountMap.put(topic, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); - return iter; - } - - public static class TestPartitioner implements Partitioner { - - static int counter; - - @Override - public void configure(Map<String, ?> configs) { - // nothing to do, test - } - - @Override - public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, - Cluster cluster) { - counter++; - return 0; - } - - @Override - public void close() { - counter = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java deleted file mode 100644 index af0d343..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ /dev/null @@ -1,375 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.mockito.Mockito; -import static org.mockito.Mockito.times; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.verify; - -public class PublishKafkaTest { - - @Test - public void validateCustomSerilaizerDeserializerSettings() throws Exception { - PublishKafka_0_10 publishKafka = new PublishKafka_0_10(); - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(PublishKafka_0_10.TOPIC, "foo"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3 sec"); - runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - runner.assertValid(); - runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo"); - runner.assertNotValid(); - } - - @Test - public void validatePropertiesValidation() throws Exception { - PublishKafka_0_10 publishKafka = new PublishKafka_0_10(); - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(PublishKafka_0_10.TOPIC, "foo"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "foo"); - - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid")); - } - } - - @Test - public void validateCustomValidation() { - String topicName = "validateCustomValidation"; - PublishKafka_0_10 publishKafka = new PublishKafka_0_10(); - - /* - * Validates that Kerberos principle is required if one of SASL set for - * secirity protocol - */ - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); - try { - runner.run(); - fail(); - } catch (Throwable e) { - assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because")); - } - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateSingleCharacterDemarcatedMessages() { - String topicName = "validateSingleCharacterDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() { - String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName()); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo"); - - runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() { - String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; - StubPublishKafka putKafka = new StubPublishKafka(1); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName()); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo"); - - runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnSendFailureAndThenResendSuccessA() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3000 millis"); - - final String text = "Hello World\nGoodbye\nfail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - putKafka.destroy(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnSendFailureAndThenResendSuccessB() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(1); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis"); - - final String text = "Hello World\nGoodbye\nfail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis"); - - final String text = "futurefail\nHello World\nGoodbye\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0); - assertNotNull(ff); - runner.enqueue(ff); - - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - // 6 sends due to duplication - verify(producer, times(5)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis"); - - final String text = "Hello World\nGoodbye\nfuturefail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0); - assertNotNull(ff); - runner.enqueue(ff); - - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - // 6 sends due to duplication - verify(producer, times(6)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateDemarcationIntoEmptyMessages() { - String topicName = "validateDemarcationIntoEmptyMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - final TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(PublishKafka_0_10.KEY, "key1"); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); - runner.enqueue(bytes); - runner.run(1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexRightPartialDemarcatedMessages() { - String topicName = "validateComplexRightPartialDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(3)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexLeftPartialDemarcatedMessages() { - String topicName = "validateComplexLeftPartialDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >å <å WILDSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexPartialMatchDemarcatedMessages() { - String topicName = "validateComplexPartialMatchDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDBOOMSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @Test - public void validateUtf8Key() { - String topicName = "validateUtf8Key"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.KEY, "${myKey}"); - - final Map<String, String> attributes = Collections.singletonMap("myKey", "key1"); - runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); - runner.run(1); - - runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); - final Map<Object, Object> msgs = putKafka.getMessagesSent(); - assertEquals(1, msgs.size()); - final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); - assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey)); - } - - @Test - public void validateHexKey() { - String topicName = "validateUtf8Key"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka_0_10.TOPIC, topicName); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING); - runner.setProperty(PublishKafka_0_10.KEY, "${myKey}"); - - final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931"); - runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); - runner.run(1); - - runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); - final Map<Object, Object> msgs = putKafka.getMessagesSent(); - assertEquals(1, msgs.size()); - final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); - - assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey)); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java deleted file mode 100644 index 76c29cd..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -import org.junit.Test; - -public class PublishingContextTest { - - @Test - public void failInvalidConstructorArgs() { - try { - new PublishingContext(null, null); - fail(); - } catch (IllegalArgumentException e) { - // success - } - try { - new PublishingContext(mock(InputStream.class), null); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - new PublishingContext(mock(InputStream.class), ""); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - new PublishingContext(mock(InputStream.class), "mytopic", -3); - fail(); - } catch (IllegalArgumentException e) { - // success - } - } - - @Test - public void validateFullSetting() { - PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3); - publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8)); - publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8)); - - assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8)); - assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8)); - assertEquals("topic", publishingContext.getTopic()); - assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString()); - } - - @Test - public void validateOnlyOnceSetPerInstance() { - PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic"); - publishingContext.setKeyBytes(new byte[]{0}); - try { - publishingContext.setKeyBytes(new byte[]{0}); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - publishingContext.setDelimiterBytes(new byte[]{0}); - try { - publishingContext.setDelimiterBytes(new byte[]{0}); - fail(); - } catch (IllegalArgumentException e) { - // success - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java deleted file mode 100644 index c009014..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.lang.reflect.Field; -import static org.mockito.Mockito.when; - -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS; -import org.mockito.Mockito; -import static org.mockito.Mockito.mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class StubPublishKafka extends PublishKafka_0_10 { - - private volatile Producer<byte[], byte[]> producer; - - private volatile boolean failed; - - private final int ackCheckSize; - - private final ExecutorService executor = Executors.newCachedThreadPool(); - private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>(); - - StubPublishKafka(int ackCheckSize) { - this.ackCheckSize = ackCheckSize; - } - - public Producer<byte[], byte[]> getProducer() { - return producer; - } - - public void destroy() { - this.executor.shutdownNow(); - } - - public Map<Object, Object> getMessagesSent() { - return new HashMap<>(msgsSent); - } - - @SuppressWarnings("unchecked") - @Override - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) - throws ProcessException { - final Map<String, String> kafkaProperties = new HashMap<>(); - KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - KafkaPublisher publisher; - try { - Field f = PublishKafka_0_10.class.getDeclaredField("brokers"); - f.setAccessible(true); - f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); - publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); - publisher.setAckWaitTime(15000); - producer = mock(Producer.class); - this.instrumentProducer(producer, false); - Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); - kf.setAccessible(true); - kf.set(publisher, producer); - - Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog"); - componentLogF.setAccessible(true); - componentLogF.set(publisher, mock(ComponentLog.class)); - - Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize"); - ackCheckSizeField.setAccessible(true); - ackCheckSizeField.set(publisher, this.ackCheckSize); - } catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException(e); - } - return publisher; - } - - @SuppressWarnings("unchecked") - private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) { - - when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { - final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class); - if (record != null && record.key() != null) { - msgsSent.put(record.key(), record.value()); - } - - String value = new String(record.value(), StandardCharsets.UTF_8); - if ("fail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - throw new RuntimeException("intentional"); - } - Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() { - @Override - public RecordMetadata call() throws Exception { - if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - throw new TopicAuthorizationException("Unauthorized"); - } else { - TopicPartition partition = new TopicPartition("foo", 0); - RecordMetadata meta = new RecordMetadata(partition, 0, 0); - return meta; - } - } - }); - return future; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java new file mode 100644 index 0000000..e54a10c --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; + +import org.apache.nifi.util.MockFlowFile; +import org.junit.Assert; +import org.junit.Test; + +public class TestInFlightMessageTracker { + + @Test(timeout = 5000L) + public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException { + final MockFlowFile flowFile = new MockFlowFile(1L); + + final InFlightMessageTracker tracker = new InFlightMessageTracker(); + tracker.incrementSentCount(flowFile); + + verifyNotComplete(tracker); + + tracker.incrementSentCount(flowFile); + verifyNotComplete(tracker); + + tracker.incrementAcknowledgedCount(flowFile); + verifyNotComplete(tracker); + + tracker.incrementAcknowledgedCount(flowFile); + tracker.awaitCompletion(1L); + } + + @Test(timeout = 5000L) + public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException { + final MockFlowFile flowFile = new MockFlowFile(1L); + + final InFlightMessageTracker tracker = new InFlightMessageTracker(); + tracker.incrementSentCount(flowFile); + + verifyNotComplete(tracker); + + tracker.incrementSentCount(flowFile); + verifyNotComplete(tracker); + + final ExecutorService exec = Executors.newFixedThreadPool(1); + final Future<?> future = exec.submit(() -> { + try { + tracker.awaitCompletion(10000L); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + tracker.incrementAcknowledgedCount(flowFile); + tracker.incrementAcknowledgedCount(flowFile); + + future.get(); + } + + private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException { + try { + tracker.awaitCompletion(10L); + Assert.fail("Expected timeout"); + } catch (final TimeoutException te) { + // expected + } + } + +}