I am wrapping Storm's implementation of KafkaSpout in order to consume and emit tuples from Kafka to a particular stream instead of the 'default' stream. Is this possible? Note my configuration successfully consumes from Kafka via the KafkaSpout and emits to the 'default' stream if I do not override the declareOutputFields() method. Details below.
Thanks in advance for you help, -Dan Details: The extended class CSKafkaSpout has the override method for declareOutputFields(OutputFieldsDeclarer declarer) but it doesn't appear to be called? A consolidated Topology and the wrapper class CSKafkaSpout that I'm developing are as follows: *// Consolidated Topology class* public static void main(String[] args) throws Exception { CSKafkaSpout csKafkaSpout = new CSKafkaSpout( properties.getProperty("zookeeper.broker.hosts"), properties.getProperty("zookeeper.broker.path"), properties.getProperty("kafka.topic.weblogs"), properties.getProperty("zookeeper.root.path"), properties.getProperty("zookeeper.kafka.id.weblogs") ); Config topology_config = new Config(); topology_config.setNumWorkers(Integer.valueOf(properties.getProperty("storm.num.workers"))); topology_config.setMaxTaskParallelism(Integer.valueOf(properties.getProperty("storm.max.task.parallelism"))); topology_config.setMessageTimeoutSecs(Integer.valueOf(properties.getProperty("storm.message.timeout.seconds"))); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka_weblogs", csKafkaSpout).setMaxSpoutPending(Integer.valueOf(properties.getProperty("storm.max.spout.pending"))); builder.setBolt("printer", new PrinterBolt()) * .shuffleGrouping("kafka_weblogs", csKafkaSpout.getStream())* ; StormSubmitter.submitTopology("WeblogTopology", topology_config, builder.createTopology()); } *// CSKafkaSpout Wrapper for KafkaSpout* public final class CSKafkaSpout extends storm.kafka.KafkaSpout { private static final Logger logger = LoggerFactory.getLogger(CSKafkaSpout.class); public static final long STORM_KAFKA_FROM_READ_FROM_CURRENT_OFFSET = kafka.api.OffsetRequest.LatestTime(); public static final long STORM_KAFKA_FROM_READ_FROM_START = kafka.api.OffsetRequest.EarliestTime(); private final String brokerZkStr; private final String brokerZkPath; private final String kafka_topic; private final String zkRoot; private final String consumer_id; private final BrokerHosts broker_hosts; private final SpoutConfig spout_config; public static final String STREAM_CSKAFKASPOUT = "stream_kafka_spout"; public String getStream() { return STREAM_CSKAFKASPOUT; } public CSKafkaSpout(String brokerZkStr, String brokerZkPath, String kafka_topic, String zkRoot, String consumer_id) { *super(generateSpoutConfig(brokerZkStr, brokerZkPath, kafka_topic, zkRoot, consumer_id));* // called first, constructor for KafkaSpout this.brokerZkStr = brokerZkStr; this.brokerZkPath = brokerZkPath; this.kafka_topic = kafka_topic; this.zkRoot = zkRoot; this.consumer_id = consumer_id; this.broker_hosts = generateBrokerHosts(this.brokerZkStr, this.brokerZkPath); this.spout_config = generateSpoutConfig(this.broker_hosts, this.kafka_topic, this.zkRoot, this.consumer_id); } * @Override* * public void declareOutputFields(OutputFieldsDeclarer declarer) {* * declarer.declareStream(getStream(), this.spout_config.scheme.getOutputFields());* * }* public BrokerHosts getBrokerHosts() { return this.broker_hosts; } public SpoutConfig getSpoutConfig() { return this.spout_config; } public static BrokerHosts generateBrokerHosts(String brokerZkStr, String brokerZkPath) { return new ZkHosts(brokerZkStr, brokerZkPath); } public static SpoutConfig generateSpoutConfig(String brokerZkStr, String brokerZkPath, String kafka_topic, String zkRoot, String consumer_id) { BrokerHosts broker_hosts = generateBrokerHosts(brokerZkStr, brokerZkPath); return generateSpoutConfig(broker_hosts, kafka_topic, zkRoot, consumer_id); } public static SpoutConfig generateSpoutConfig(BrokerHosts broker_hosts, String kafka_topic, String zkRoot, String consumer_id) { SpoutConfig spout_config = new SpoutConfig(broker_hosts, kafka_topic, zkRoot, consumer_id); // specific overrides for the spout configuration spout_config.scheme = new SchemeAsMultiScheme(new StringScheme()); spout_config.bufferSizeBytes = 1024*1024*4; spout_config.fetchSizeBytes = 1024*1024*4; spout_config.forceFromStart = true; spout_config.startOffsetTime = STORM_KAFKA_FROM_READ_FROM_START; return spout_config; } }