Hi,
I am doing storm-kafka POC on windows 7.
1) Downloaded kafka 2.10 and Storm 0.9.4
2) Started zookeeper from kafka bin on localhost 2181
3) Created test topic in kafka
4) Console producer and consumer working fine on test topic
5) Storm word reader is working fine in local mode.
6) Now following program is having issue.
public class TopologySki2 {
public static class PrinterBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory
.getLogger(PrinterBolt.class);
public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("Printing tuple with toString(): "
+ tuple.toString());
System.out.println("Printing tuple with getString(): "
+ tuple.getString(0));
logger.info("Logging tuple with logger: " + tuple.getString(0));
}
}
public static void main(String[] args) throws Exception {
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts(
"localhost:2181", "/brokers"),"test", "/ski2",
UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout_2", new KafkaSpout(spoutConfig), 1);
builder.setBolt("printer", new
PrinterBolt()).shuffleGrouping("kafka_spout_2");
LocalCluster cluster = new LocalCluster();
Config stormConf = new Config();
stormConf.setDebug(true);
List<String> stromZkHostList = new ArrayList<String>();
stromZkHostList.add("localhost");
stormConf.put("storm.zookeeper.servers", stromZkHostList);
stormConf.put("storm.zookeeper.port", 2181);
cluster.submitTopology("ski-test-2", stormConf,
builder.createTopology());
System.out.println("topology 2 created..");
}
}
Issues: It is not reading the message from kafka topic.
No error or exception in log. It is emitting spout and bolt metrics.
Logs:
1715252 [Thread-20] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
1716249 [Thread-16] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
nextTuple:
nextTuple:
nextTuple:
nextTuple:
nextTuple:
nextTuple:
1745255 [Thread-20] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
1745255 [Thread-18] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
1745255 [Thread-20] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
1745256 [Thread-20] INFO backtype.storm.daemon.task - Emitting: __acker
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@2bee5d56> [#<DataPoint
[__emit-count = {__metrics=0}]> #<DataPoint [__process-latency = {}]>
#<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count =
{__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint
[__fail-count = {}]> #<DataPoint [__execute-count = {__system:__tick=0}]>]]
1745256 [Thread-18] INFO backtype.storm.daemon.task - Emitting: printer
__metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@56853978> [#<DataPoint
[__emit-count = {__metrics=0}]> #<DataPoint [__process-latency = {}]>
#<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count =
{__metrics=0}]> #<DataPoint [__execute-latency = {}]> #<DataPoint
[__fail-count = {}]> #<DataPoint [__execute-count = {}]>]]
1746258 [Thread-16] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __tick, id: {}, [30]
1746258 [Thread-16] INFO backtype.storm.daemon.executor - Processing
received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
1746258 [Thread-16] INFO storm.kafka.ZkCoordinator - Refreshing partition
manager connections
1746259 [Thread-16] INFO storm.kafka.ZkCoordinator - Deleted partition
managers: []
1746260 [Thread-16] INFO storm.kafka.ZkCoordinator - New partition
managers: []
1746260 [Thread-16] INFO storm.kafka.ZkCoordinator - Finished refreshing
1746260 [Thread-16] INFO backtype.storm.daemon.task - Emitting:
kafka_spout_2 __metrics [#<TaskInfo
backtype.storm.metric.api.IMetricsConsumer$TaskInfo@53ef96e9> [#<DataPoint
[__complete-latency = {}]> #<DataPoint [__emit-count = {__metrics=0}]>
#<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count =
{__metrics=0}]> #<DataPoint [__fail-count = {}]> #<DataPoint [kafkaOffset =
{totalSpoutLag=0, totalLatestEmittedOffset=0, totalLatestTime=0}]>]]
nextTuple:
Please suggest.
Thanks
Jatin