Hi team,
I am making an application to get data from kafka queue and after some
processing push it again in kafka queue in different topic. I got a help
from this
URL<https://github.com/quintona/trident-kafka-push/blob/master/src/test/java/com/github/quintona/TestTopology.java>.
But data is not going in kafka queue. Below is my Topology and
KafkaState
(implements State) class. Please tell me where i am doing wrong.
public class TestTridentTopology {
public static Logger logger = Logger.getLogger(TestTridentTopology.class);
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
String topologyName = null;
String[] kafkaTopic = null;
int length = args.length;
if (args != null && length > 0) {
topologyName = args[0];
kafkaTopic = new String[length - 1];
for (int i = 1; i < length; i++) {
kafkaTopic[i - 1] = args[i];
}
}
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(5);
conf.setMaxSpoutPending(5);
conf.setMaxTaskParallelism(3);
if (topologyName != null) {
StormSubmitter.submitTopology(topologyName, conf,
buildTopology(kafkaTopic));
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, buildTopology(kafkaTopic));
Utils.sleep(10000);
}
}
/**
*
* @param service
* @return
*/
public static StormTopology buildTopology(String[] kafkaTopic) {
try {
BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers");
TridentKafkaConfig config = new TridentKafkaConfig(brokerHost,
"testtopic1");
config.forceStartOffsetTime(-2);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
TransactionalTridentKafkaSpout spout = new
TransactionalTridentKafkaSpout(config);
TridentTopology topology = new TridentTopology();
TridentState parallelismHint = topology.newStream("feed", spout)
.shuffle()
.each(new Fields("str"), new TridentFetcherBolt(), new
Fields("textJSON"))
.partitionPersist(KafkaState.transactional("testtopic2", new
KafkaState.Options()), new KafkaStateUpdater("textJSON"))
.parallelismHint(1);
logger.warn("parallelismHint" + parallelismHint);
return topology.build();
} catch (Exception e) {
logger.error("Exception: ", e);
}
return null;
}
}
public class KafkaState implements State {
private static final Logger logger = Logger.getLogger(KafkaState.class);
ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();
public static class Options implements Serializable {
public String zookeeperHost = "127.0.0.1";
public int zookeeperPort = 2181;
public String serializerClass = "kafka.serializer.StringEncoder";
public String kafkaConnect = "127.0.0.1:9092";
public Options() {
logger.debug("KafkaState::Options()");
}
public Options(String zookeeperHost, int zookeeperPort, String
serializerClass, String topicName) {
this.zookeeperHost = zookeeperHost;
this.zookeeperPort = zookeeperPort;
this.serializerClass = serializerClass;
}
}
public static StateFactory transactional(String topic, Options options) {
logger.debug("KafkaState::transactional: " + topic);
return new Factory(topic, options, true);
}
public static StateFactory nonTransactional(String topic, Options options) {
return new Factory(topic, options, false);
}
protected static class Factory implements StateFactory {
private Options options;
private String topic;
boolean transactional;
public Factory(String topic, Options options, boolean transactional) {
this.options = options;
this.topic = topic;
this.transactional = transactional;
}
@Override
public State makeState(Map conf, IMetricsContext metrics,
int partitionIndex, int numPartitions) {
return new KafkaState(topic, options, transactional);
}
}
private Options options;
private String topic;
Producer<String, String> producer;
private boolean transactional;
public KafkaState(String topic, Options options, boolean transactional) {
this.topic = topic;
this.options = options;
this.transactional = transactional;
Properties props = new Properties();
props.put("zk.connect", options.zookeeperHost + ":" +
Integer.toString(options.zookeeperPort));
props.put("serializer.class", options.serializerClass);
props.put("metadata.broker.list", options.kafkaConnect);
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<>(config);
logger.debug("producer initialized successfully." + producer);
}
@Override
public void beginCommit(Long txid) {
logger.debug("KafkaState::beginCommit");
if (messages.size() > 0) {
throw new RuntimeException("Kafka State is invalid, the
previous transaction didn't flush");
}
}
public void enqueue(String message) {
logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message);
if (transactional) {
messages.add(message);
} else {
sendMessage(message);
}
}
private void sendMessage(String message) {
KeyedMessage<String, String> data = new KeyedMessage<String,
String>(topic, message);
producer.send(data);
}
@Override
public void commit(Long txid) {
String message = messages.poll();
logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message);
while (message != null) {
sendMessage(message);
message = messages.poll();
}
}
}
Thanks