I am wrapping Storm's implementation of KafkaSpout in order to consume and
emit tuples from Kafka to a particular stream instead of the 'default'
stream. Is this possible?  Note my configuration successfully consumes from
Kafka via the KafkaSpout and emits to the 'default' stream if I do not
override the declareOutputFields() method.  Details below.

Thanks in advance for you help,  -Dan

Details:

The extended class CSKafkaSpout has the override method for
declareOutputFields(OutputFieldsDeclarer
declarer) but it doesn't appear to be called?  A consolidated Topology and
the wrapper class CSKafkaSpout that I'm developing are as follows:

*//  Consolidated Topology class*
public static void main(String[] args) throws Exception {
        CSKafkaSpout csKafkaSpout = new CSKafkaSpout(
                properties.getProperty("zookeeper.broker.hosts"),
                properties.getProperty("zookeeper.broker.path"),
                properties.getProperty("kafka.topic.weblogs"),
                properties.getProperty("zookeeper.root.path"),
                properties.getProperty("zookeeper.kafka.id.weblogs")
        );
        Config topology_config = new Config();

topology_config.setNumWorkers(Integer.valueOf(properties.getProperty("storm.num.workers")));

topology_config.setMaxTaskParallelism(Integer.valueOf(properties.getProperty("storm.max.task.parallelism")));

topology_config.setMessageTimeoutSecs(Integer.valueOf(properties.getProperty("storm.message.timeout.seconds")));

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka_weblogs",
csKafkaSpout).setMaxSpoutPending(Integer.valueOf(properties.getProperty("storm.max.spout.pending")));
        builder.setBolt("printer", new PrinterBolt())
*                .shuffleGrouping("kafka_weblogs",
csKafkaSpout.getStream())*
        ;
        StormSubmitter.submitTopology("WeblogTopology", topology_config,
builder.createTopology());

}

*//   CSKafkaSpout Wrapper for KafkaSpout*
public final class CSKafkaSpout extends storm.kafka.KafkaSpout {

    private static final Logger logger =
LoggerFactory.getLogger(CSKafkaSpout.class);

    public static final long STORM_KAFKA_FROM_READ_FROM_CURRENT_OFFSET =
kafka.api.OffsetRequest.LatestTime();
    public static final long STORM_KAFKA_FROM_READ_FROM_START =
kafka.api.OffsetRequest.EarliestTime();

    private final String brokerZkStr;
    private final String brokerZkPath;
    private final String kafka_topic;
    private final String zkRoot;
    private final String consumer_id;

    private final BrokerHosts broker_hosts;
    private final SpoutConfig spout_config;

    public static final String STREAM_CSKAFKASPOUT = "stream_kafka_spout";

    public String getStream() {
        return STREAM_CSKAFKASPOUT;
    }

    public CSKafkaSpout(String brokerZkStr, String brokerZkPath, String
kafka_topic, String zkRoot, String consumer_id) {
        *super(generateSpoutConfig(brokerZkStr, brokerZkPath, kafka_topic,
zkRoot, consumer_id));* //  called first, constructor for KafkaSpout
        this.brokerZkStr = brokerZkStr;
        this.brokerZkPath = brokerZkPath;
        this.kafka_topic = kafka_topic;
        this.zkRoot = zkRoot;
        this.consumer_id = consumer_id;
        this.broker_hosts = generateBrokerHosts(this.brokerZkStr,
this.brokerZkPath);
        this.spout_config = generateSpoutConfig(this.broker_hosts,
this.kafka_topic, this.zkRoot, this.consumer_id);
    }

*    @Override*
*    public void declareOutputFields(OutputFieldsDeclarer declarer) {*
*        declarer.declareStream(getStream(),
this.spout_config.scheme.getOutputFields());*
*    }*

    public BrokerHosts getBrokerHosts() {
        return this.broker_hosts;
    }

    public SpoutConfig getSpoutConfig() {
        return this.spout_config;
    }

    public static BrokerHosts generateBrokerHosts(String brokerZkStr,
String brokerZkPath) {
        return new ZkHosts(brokerZkStr, brokerZkPath);
    }

    public static SpoutConfig generateSpoutConfig(String brokerZkStr,
String brokerZkPath, String kafka_topic, String zkRoot, String consumer_id)
{
        BrokerHosts broker_hosts = generateBrokerHosts(brokerZkStr,
brokerZkPath);
        return generateSpoutConfig(broker_hosts, kafka_topic, zkRoot,
consumer_id);
    }

    public static SpoutConfig generateSpoutConfig(BrokerHosts broker_hosts,
String kafka_topic, String zkRoot, String consumer_id) {
        SpoutConfig spout_config = new SpoutConfig(broker_hosts,
kafka_topic, zkRoot, consumer_id);
        //  specific overrides for the spout configuration
        spout_config.scheme = new SchemeAsMultiScheme(new StringScheme());
        spout_config.bufferSizeBytes = 1024*1024*4;
        spout_config.fetchSizeBytes = 1024*1024*4;
        spout_config.forceFromStart = true;
        spout_config.startOffsetTime = STORM_KAFKA_FROM_READ_FROM_START;
        return spout_config;
    }
}

Reply via email to