[ https://issues.apache.org/jira/browse/STORM-2225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Joseph Evans resolved STORM-2225. ---------------------------------------- Resolution: Fixed Fix Version/s: 1.1.0 2.0.0 > Kafka New API make simple things simple > --------------------------------------- > > Key: STORM-2225 > URL: https://issues.apache.org/jira/browse/STORM-2225 > Project: Apache Storm > Issue Type: Improvement > Components: storm-kafka-client > Affects Versions: 1.0.0, 2.0.0 > Reporter: Robert Joseph Evans > Assignee: Robert Joseph Evans > Fix For: 2.0.0, 1.1.0 > > Time Spent: 24h > Remaining Estimate: 0h > > The Kafka spouts in storm-kafka-client use the new API and are very > extendable, but doing very simple things take way too many lines of code. > For example to create a KafkaTridentSpoutOpaque you need the following code > (from the example). > {code} > private KafkaTridentSpoutOpaque<String, String> > newKafkaTridentSpoutOpaque() { > return new KafkaTridentSpoutOpaque<>(new KafkaTridentSpoutManager<>( > newKafkaSpoutConfig( > newKafkaSpoutStreams()))); > } > private KafkaSpoutConfig<String,String> > newKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { > return new KafkaSpoutConfig.Builder<>(newKafkaConsumerProps(), > kafkaSpoutStreams, newTuplesBuilder(), newRetryService()) > .setOffsetCommitPeriodMs(10_000) > .setFirstPollOffsetStrategy(EARLIEST) > .setMaxUncommittedOffsets(250) > .build(); > } > protected Map<String,Object> newKafkaConsumerProps() { > Map<String, Object> props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, > "127.0.0.1:9092"); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("max.partition.fetch.bytes", 200); > return props; > } > protected KafkaSpoutTuplesBuilder<String, String> newTuplesBuilder() { > return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( > new TopicsTupleBuilder<String, String>(TOPIC_1, TOPIC_2)) > .build(); > } > protected KafkaSpoutRetryService newRetryService() { > return new KafkaSpoutRetryExponentialBackoff(new > KafkaSpoutRetryExponentialBackoff.TimeInterval(500L, TimeUnit.MICROSECONDS), > > KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), > Integer.MAX_VALUE, > KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)); > } > protected KafkaSpoutStreams newKafkaSpoutStreams() { > return new KafkaSpoutStreamsNamedTopics.Builder(new Fields("str"), > new String[]{"test-trident","test-trident-1"}).build(); > } > protected static class TopicsTupleBuilder<K, V> extends > KafkaSpoutTupleBuilder<K,V> { > public TopicsTupleBuilder(String... topics) { > super(topics); > } > @Override > public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { > return new Values(consumerRecord.value()); > } > } > {code} > All of this so I can have a trident spout that reads <String, String> values > from "localhost:9092" on the topics "test-trident" and "test-trident-1" and > outputting the value as the field "str". > I shouldn't need 50 lines of code for something I can explain in 3 lines of > test. It feels like we need to have some better defaults, and less overhead > on a lot of these things. -- This message was sent by Atlassian JIRA (v6.3.15#6346)