Repository: storm Updated Branches: refs/heads/master 4966d7a69 -> 3f84dcca0
STORM-2658: Extract storm-kafka-client examples to storm-kafka-client-examples Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b84248d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b84248d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b84248d Branch: refs/heads/master Commit: 9b84248d4c496340f60253bbcabae1b097d4dd23 Parents: 3f386c9 Author: Stig Rohde Døssing <s...@apache.org> Authored: Tue Jul 25 17:33:37 2017 +0200 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Tue Aug 1 15:37:54 2017 +0200 ---------------------------------------------------------------------- docs/storm-kafka-client.md | 49 +------- .../storm-kafka-client-examples/README.markdown | 10 ++ examples/storm-kafka-client-examples/pom.xml | 8 +- .../kafka/spout/test/KafkaSpoutTestBolt.java | 49 ++++++++ .../test/KafkaSpoutTopologyMainNamedTopics.java | 106 +++++++++++++++++ .../KafkaSpoutTopologyMainWildcardTopics.java | 61 ++++++++++ .../TridentKafkaClientWordCountNamedTopics.java | 74 ++++++------ ...identKafkaClientWordCountWildcardTopics.java | 5 +- .../KafkaSpoutTopologyMainNamedTopicsLocal.java | 66 +++++++++++ ...fkaSpoutTopologyMainWildcardTopicsLocal.java | 29 +++++ external/storm-kafka-client/pom.xml | 3 - .../storm/kafka/spout/KafkaSpoutCommitTest.java | 7 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 7 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 9 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 7 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 7 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 7 +- .../SpoutWithMockedConsumerSetupHelper.java | 1 - .../SingleTopicKafkaSpoutConfiguration.java | 84 -------------- .../SingleTopicKafkaSpoutConfiguration.java | 66 +++++++++++ .../kafka/spout/test/KafkaSpoutTestBolt.java | 49 -------- .../test/KafkaSpoutTopologyMainNamedTopics.java | 115 ------------------- .../KafkaSpoutTopologyMainWildcardTopics.java | 58 ---------- 23 files changed, 460 insertions(+), 417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/docs/storm-kafka-client.md ---------------------------------------------------------------------- diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md index 99b9ae5..a1814b4 100644 --- a/docs/storm-kafka-client.md +++ b/docs/storm-kafka-client.md @@ -217,6 +217,9 @@ final Stream spoutStream = tridentTopology.newStream("kafkaSpout", Trident does not support multiple streams and will ignore any streams set for output. If however the Fields are not identical for each output topic it will throw an exception and not continue. +#### Example topologies +Example topologies using storm-kafka-client can be found in the examples/storm-kafka-client-examples directory included in the Storm source or binary distributions. + ### Custom RecordTranslators (ADVANCED) In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator should cover your use case. If you do run into a situation where you need a custom one @@ -244,52 +247,6 @@ otherwise trident can throw exceptions. By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality. -## Use the Maven Shade Plugin to Build the Uber Jar - -Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml` - -```xml -<plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.1</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass> - </transformer> - </transformers> - </configuration> - </execution> - </executions> -</plugin> -``` - -create the uber jar by running the command: - -`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml` - -This will create the uber jar file with the name and location matching the following pattern: - -`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar` - -### Run Storm Topology - -Copy the file `REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jar` to `STORM_HOME/extlib` - -Using the Kafka command line tools create three topics [test, test1, test2] and use the Kafka console producer to populate the topics with some data - -Execute the command `STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain` - -With the debug level logs enabled it is possible to see the messages of each topic being redirected to the appropriate Bolt as defined -by the streams defined and choice of shuffle grouping. - ## Using storm-kafka-client with different versions of kafka Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/README.markdown ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/README.markdown b/examples/storm-kafka-client-examples/README.markdown new file mode 100644 index 0000000..74c1366 --- /dev/null +++ b/examples/storm-kafka-client-examples/README.markdown @@ -0,0 +1,10 @@ +## Usage +This module contains example topologies demonstrating storm-kafka-client spout and Trident usage. + +The module is built by running `mvn clean package -Dprovided.scope=compile`. This will generate the `target/storm-kafka-client-examples-VERSION.jar` file. The jar contains all dependencies and can be submitted to Storm via the Storm CLI, e.g. +``` +storm jar storm-kafka-client-examples-2.0.0-SNAPSHOT.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics +``` +will submit the topologies set up by KafkaSpoutTopologyMainNamedTopics to Storm. + +Note that this example produces a jar containing all dependencies for ease of use. In a production environment you may want to reduce the jar size by extracting some dependencies (e.g. org.apache.kafka:kafka-clients) from the jar. You can do this by setting the dependencies you don't want to include in the jars to `provided` scope, and then using the --artifacts flag for the storm jar command to fetch the dependencies when submitting the topology. See the [CLI documentation](http://storm.apache.org/releases/2.0.0-SNAPSHOT/Command-line-client.html) for syntax. http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml index 9c5796d..382942f 100644 --- a/examples/storm-kafka-client-examples/pom.xml +++ b/examples/storm-kafka-client-examples/pom.xml @@ -87,6 +87,12 @@ <version>${storm.kafka.client.version}</version> <scope>${provided.scope}</scope> </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-server</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -136,7 +142,7 @@ <artifactId>maven-checkstyle-plugin</artifactId> <!--Note - the version would be inherited--> <configuration> - <maxAllowedViolations>2</maxAllowedViolations> + <maxAllowedViolations>0</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java new file mode 100644 index 0000000..8ace45c --- /dev/null +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaSpoutTestBolt extends BaseRichBolt { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + LOG.debug("input = [" + input + "]"); + collector.ack(input); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java new file mode 100644 index 0000000..0731069 --- /dev/null +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.ByTopicRecordTranslator; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.trident.KafkaProducerTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class KafkaSpoutTopologyMainNamedTopics { + + private static final String TOPIC_2_STREAM = "test_2_stream"; + private static final String TOPIC_0_1_STREAM = "test_0_1_stream"; + private static final String KAFKA_LOCAL_BROKER = "localhost:9092"; + public static final String TOPIC_0 = "kafka-spout-test"; + public static final String TOPIC_1 = "kafka-spout-test-1"; + public static final String TOPIC_2 = "kafka-spout-test-2"; + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainNamedTopics().runMain(args); + } + + protected void runMain(String[] args) throws Exception { + final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER; + System.out.println("Running with broker url: " + brokerUrl); + + Config tpConf = getConfig(); + + // Producers. This is just to get some data in Kafka, normally you would be getting this data from elsewhere + StormSubmitter.submitTopology(TOPIC_0 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0)); + StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1)); + StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2)); + + //Consumer. Sets up a topology that reads the given Kafka spouts and logs the received messages + StormSubmitter.submitTopology("storm-kafka-client-spout-test", tpConf, getTopologyKafkaSpout(getKafkaSpoutConfig(brokerUrl))); + } + + protected Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + protected StormTopology getTopologyKafkaSpout(KafkaSpoutConfig<String, String> spoutConfig) { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(spoutConfig), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) + .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM) + .shuffleGrouping("kafka_spout", TOPIC_2_STREAM); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM); + return tp.createTopology(); + } + + protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers) { + ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( + (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM); + trans.forTopic(TOPIC_2, + (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), + new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM); + return KafkaSpoutConfig.builder(bootstrapServers, new String[]{TOPIC_0, TOPIC_1, TOPIC_2}) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator(trans) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); + } + + protected KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java new file mode 100644 index 0000000..2528c00 --- /dev/null +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics { + + private static final String STREAM = "test_wildcard_stream"; + private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("kafka-spout-test-[1|2]"); + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainWildcardTopics().runMain(args); + } + + @Override + protected StormTopology getTopologyKafkaSpout(KafkaSpoutConfig<String, String> spoutConfig) { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(spoutConfig), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); + return tp.createTopology(); + } + + @Override + protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers) { + return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setRetry(getRetryService()) + .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), + new Fields("topic", "partition", "offset", "key", "value"), STREAM) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java index 886d15d..ecc52b4 100644 --- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java @@ -42,72 +42,68 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TridentKafkaClientWordCountNamedTopics { + private static final String TOPIC_1 = "test-trident"; private static final String TOPIC_2 = "test-trident-1"; private static final String KAFKA_LOCAL_BROKER = "localhost:9092"; - private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() { - return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig()); + private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) { + return new KafkaTridentSpoutOpaque<>(spoutConfig); } private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc(); /** - * Needs to be serializable + * Needs to be serializable. */ private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable { + @Override public List<Object> apply(ConsumerRecord<String, String> record) { return new Values(record.value()); } } - protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() { - return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime()) - .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200) - .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) - .setRetry(newRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .build(); + protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) { + return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime()) + .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200) + .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str")) + .setRetry(newRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); } protected KafkaSpoutRetryService newRetryService() { return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, TimeUnit.MICROSECONDS), - TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); } public static void main(String[] args) throws Exception { new TridentKafkaClientWordCountNamedTopics().run(args); } - protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { - if (args.length > 0 && Arrays.stream(args).anyMatch(option -> option.equals("-h"))) { - System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", getClass().getName(), - "broker_host:broker_port", "topic1", "topic2", "topology_name"); - } else { - final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER; - final String topic1 = args.length > 1 ? args[1] : TOPIC_1; - final String topic2 = args.length > 2 ? args[2] : TOPIC_2; - - System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl, topic1, topic2); - - Config tpConf = new Config(); - tpConf.setDebug(true); - tpConf.setMaxSpoutPending(5); - - // Producers - StormSubmitter.submitTopology(topic1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1)); - StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2)); - // Consumer - StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque())); - - // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log - Thread.sleep(2000); - DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS); - } + protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, + AuthorizationException, InterruptedException { + final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER; + System.out.println("Running with broker url " + brokerUrl); + + Config tpConf = new Config(); + tpConf.setDebug(true); + tpConf.setMaxSpoutPending(5); + + // Producers + StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1)); + StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2)); + // Consumer + StormSubmitter.submitTopology("topics-consumer", tpConf, + TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque(newKafkaSpoutConfig(brokerUrl)))); + + // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log + Thread.sleep(2000); + DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS); System.exit(0); // Kill all the non daemon threads } } http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java index 0be3127..865ad82 100644 --- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java @@ -30,8 +30,9 @@ import org.apache.storm.tuple.Values; public class TridentKafkaClientWordCountWildcardTopics extends TridentKafkaClientWordCountNamedTopics { private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test-trident(-1)?"); - protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() { - return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN) + @Override + protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String bootstrapServers) { + return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN) .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200) .setRecordTranslator((r) -> new Values(r.value()), new Fields("str")) http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java new file mode 100644 index 0000000..05a0dd4 --- /dev/null +++ b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopicsLocal.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import static org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_0; +import static org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_1; +import static org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMainNamedTopics.TOPIC_2; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.kafka.trident.KafkaProducerTopology; + +public class KafkaSpoutTopologyMainNamedTopicsLocal { + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainNamedTopicsLocal().runExample(); + } + + protected void runExample() throws Exception { + String brokerUrl = "localhost:9092"; + KafkaSpoutTopologyMainNamedTopics example = getTopology(); + Config tpConf = example.getConfig(); + + LocalCluster localCluster = new LocalCluster(); + // Producers. This is just to get some data in Kafka, normally you would be getting this data from elsewhere + localCluster.submitTopology(TOPIC_0 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0)); + localCluster.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1)); + localCluster.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2)); + + //Consumer. Sets up a topology that reads the given Kafka spouts and logs the received messages + localCluster.submitTopology("storm-kafka-client-spout-test", tpConf, example.getTopologyKafkaSpout(example.getKafkaSpoutConfig(brokerUrl))); + + stopWaitingForInput(); + } + + protected KafkaSpoutTopologyMainNamedTopics getTopology() { + return new KafkaSpoutTopologyMainNamedTopics(); + } + + protected void stopWaitingForInput() { + try { + System.out.println("PRESS ENTER TO STOP"); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + System.exit(0); + } catch (IOException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java ---------------------------------------------------------------------- diff --git a/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java new file mode 100644 index 0000000..336beba --- /dev/null +++ b/examples/storm-kafka-client-examples/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopicsLocal.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +public class KafkaSpoutTopologyMainWildcardTopicsLocal extends KafkaSpoutTopologyMainNamedTopicsLocal { + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainWildcardTopicsLocal().runExample(); + } + + protected KafkaSpoutTopologyMainNamedTopics getTopology() { + return new KafkaSpoutTopologyMainWildcardTopics(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 6db04a0..2f17ea1 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -80,18 +80,15 @@ <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> - <version>${mockito.version}</version> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> - <version>1.3</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> - <version>1.3</version> <scope>test</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index 7258fe2..bdae043 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; @@ -29,7 +28,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -41,6 +40,8 @@ import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutCommitTest { private final long offsetCommitPeriodMs = 2_000; @@ -57,7 +58,7 @@ public class KafkaSpoutCommitTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 8e6d390..756ea1a 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.inOrder; @@ -35,7 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -45,6 +44,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutEmitTest { private final long offsetCommitPeriodMs = 2_000; @@ -57,7 +58,7 @@ public class KafkaSpoutEmitTest { @Before public void setUp() { - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(); consumerMock = mock(KafkaConsumer.class); http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index 23630a6..635e2d2 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; @@ -41,7 +40,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.subscription.Subscription; import org.apache.storm.spout.SpoutOutputCollector; @@ -54,6 +53,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutRebalanceTest { @Captor @@ -126,7 +127,7 @@ public class KafkaSpoutRebalanceTest { doNothing() .when(subscriptionMock) .subscribe(any(), rebalanceListenerCapture.capture(), any()); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .build(), consumerFactory); String topic = SingleTopicKafkaSpoutConfiguration.TOPIC; @@ -163,7 +164,7 @@ public class KafkaSpoutRebalanceTest { .when(subscriptionMock) .subscribe(any(), rebalanceListenerCapture.capture(), any()); KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class); - KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1) + KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1) .setOffsetCommitPeriodMs(10) .setRetry(retryServiceMock) .build(), consumerFactory); http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index 078f7a1..ec557e7 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; @@ -33,7 +32,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -45,6 +44,8 @@ import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class KafkaSpoutRetryLimitTest { private final long offsetCommitPeriodMs = 2_000; @@ -65,7 +66,7 @@ public class KafkaSpoutRetryLimitTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - spoutConfig = getKafkaSpoutConfigBuilder(-1) + spoutConfig = createKafkaSpoutConfigBuilder(-1) .setOffsetCommitPeriodMs(offsetCommitPeriodMs) .setRetry(ZERO_RETRIES_RETRY_SERVICE) .build(); http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 261c654..c361e24 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -15,7 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.everyItem; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -38,7 +37,7 @@ import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.storm.kafka.KafkaUnitRule; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.utils.Time; @@ -48,6 +47,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class MaxUncommittedOffsetTest { @Rule @@ -61,7 +62,7 @@ public class MaxUncommittedOffsetTest { private final int maxUncommittedOffsets = 10; private final int maxPollRecords = 5; private final int initialRetryDelaySecs = 60; - private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + private final KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords) .setMaxUncommittedOffsets(maxUncommittedOffsets) http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index 6b92de8..35f95cd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -17,7 +17,6 @@ */ package org.apache.storm.kafka.spout; -import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Matchers.any; @@ -45,7 +44,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.KafkaUnitRule; -import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; +import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory; import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault; import org.apache.storm.spout.SpoutOutputCollector; @@ -60,6 +59,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.MockitoAnnotations; +import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; + public class SingleTopicKafkaSpoutTest { @Rule @@ -80,7 +81,7 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) + KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort()) .setOffsetCommitPeriodMs(commitOffsetPeriodMs) .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0))) http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index 5f931bb..aa65d0f 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -17,7 +17,6 @@ package org.apache.storm.kafka.spout; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java deleted file mode 100644 index d2f38b0..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.spout.builders; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.storm.Config; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; -import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.subscription.Subscription; -import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class SingleTopicKafkaSpoutConfiguration { - - public static final String STREAM = "test_stream"; - public static final String TOPIC = "test"; - - /** - * Retry in a tight loop (keep unit tests fasts). - */ - public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = - new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), - DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); - - public static Config getConfig() { - Config config = new Config(); - config.setDebug(true); - return config; - } - - public static StormTopology getTopologyKafkaSpout(int port) { - final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); - return tp.createTopology(); - } - - public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) { - return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); - } - - public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) { - return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); - } - - private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) { - return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), - new Fields("topic", "key", "value"), STREAM) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) - .setRetry(getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .setPollTimeoutMs(1000); - } - - protected static KafkaSpoutRetryService getRetryService() { - return UNIT_TEST_RETRY_SERVICE; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java new file mode 100644 index 0000000..2bf1f36 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.spout.config.builder; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.subscription.Subscription; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class SingleTopicKafkaSpoutConfiguration { + + public static final String STREAM = "test_stream"; + public static final String TOPIC = "test"; + + /** + * Retry in a tight loop (keep unit tests fasts). + */ + public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = + new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), + DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0)); + + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) { + return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)); + } + + public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) { + return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription)); + } + + private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) { + return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), + new Fields("topic", "key", "value"), STREAM) + .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") + .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5) + .setRetry(getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .setPollTimeoutMs(1000); + } + + protected static KafkaSpoutRetryService getRetryService() { + return UNIT_TEST_RETRY_SERVICE; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java deleted file mode 100644 index 8ace45c..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.test; - -import java.util.Map; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseRichBolt; -import org.apache.storm.tuple.Tuple; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaSpoutTestBolt extends BaseRichBolt { - protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class); - private OutputCollector collector; - - @Override - public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - LOG.debug("input = [" + input + "]"); - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java deleted file mode 100644 index 50991e8..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.test; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.ByTopicRecordTranslator; -import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; -import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class KafkaSpoutTopologyMainNamedTopics { - private static final String TOPIC_2_STREAM = "test_2_stream"; - private static final String TOPIC_0_1_STREAM = "test_0_1_stream"; - private static final String[] TOPICS = new String[]{"test","test1","test2"}; - - public static void main(String[] args) throws Exception { - new KafkaSpoutTopologyMainNamedTopics().runMain(args); - } - - protected void runMain(String[] args) throws Exception { - if (args.length == 0) { - submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig()); - } else { - submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig()); - } - } - - protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws Exception { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", config, topology); - stopWaitingForInput(); - } - - protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { - StormSubmitter.submitTopology(arg, config, topology); - } - - protected void stopWaitingForInput() { - try { - System.out.println("PRESS ENTER TO STOP"); - new BufferedReader(new InputStreamReader(System.in)).readLine(); - System.exit(0); - } catch (IOException e) { - e.printStackTrace(); - } - } - - protected Config getConfig() { - Config config = new Config(); - config.setDebug(true); - return config; - } - - protected StormTopology getTopologyKafkaSpout() { - final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()) - .shuffleGrouping("kafka_spout", TOPIC_0_1_STREAM) - .shuffleGrouping("kafka_spout", TOPIC_2_STREAM); - tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", TOPIC_2_STREAM); - return tp.createTopology(); - } - - protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { - ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>( - (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), - new Fields("topic", "partition", "offset", "key", "value"), TOPIC_0_1_STREAM); - trans.forTopic(TOPICS[2], - (r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), - new Fields("topic", "partition", "offset", "key", "value"), TOPIC_2_STREAM); - return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setRetry(getRetryService()) - .setRecordTranslator(trans) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .build(); - } - - protected KafkaSpoutRetryService getRetryService() { - return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), - TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/9b84248d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java deleted file mode 100644 index d50e2ab..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.test; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; - -import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; - -public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics { - private static final String STREAM = "test_wildcard_stream"; - private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test[1|2]"); - - public static void main(String[] args) throws Exception { - new KafkaSpoutTopologyMainWildcardTopics().runMain(args); - } - - protected StormTopology getTopologyKafkaSpout() { - final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig()), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); - return tp.createTopology(); - } - - protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() { - return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPIC_WILDCARD_PATTERN) - .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup") - .setRetry(getRetryService()) - .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()), - new Fields("topic", "partition", "offset", "key", "value"), STREAM) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .build(); - } -}