No body have implemented Trident with Kafka yet ?
On Sat, Apr 5, 2014 at 3:44 PM, shamsul haque <[email protected]> wrote: > > Hi team, > > I am making an application to get data from kafka queue and after some > processing push it again in kafka queue in different topic. I got a help > from this > URL<https://github.com/quintona/trident-kafka-push/blob/master/src/test/java/com/github/quintona/TestTopology.java>. > But data is not going in kafka queue. Below is my Topology and KafkaState > (implements State) class. Please tell me where i am doing wrong. > > public class TestTridentTopology { > > public static Logger logger = Logger.getLogger(TestTridentTopology.class); > > public static void main(String[] args) throws AlreadyAliveException, > InvalidTopologyException { > String topologyName = null; > String[] kafkaTopic = null; > int length = args.length; > if (args != null && length > 0) { > topologyName = args[0]; > kafkaTopic = new String[length - 1]; > for (int i = 1; i < length; i++) { > kafkaTopic[i - 1] = args[i]; > } > } > Config conf = new Config(); > conf.setDebug(false); > conf.setNumWorkers(5); > conf.setMaxSpoutPending(5); > conf.setMaxTaskParallelism(3); > if (topologyName != null) { > StormSubmitter.submitTopology(topologyName, conf, > buildTopology(kafkaTopic)); > } else { > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("test", conf, buildTopology(kafkaTopic)); > Utils.sleep(10000); > } > } > > /** > * > * @param service > * @return > */ > public static StormTopology buildTopology(String[] kafkaTopic) { > > try { > BrokerHosts brokerHost = new ZkHosts("localhost:2181", "/brokers"); > > TridentKafkaConfig config = new TridentKafkaConfig(brokerHost, > "testtopic1"); > config.forceStartOffsetTime(-2); > config.scheme = new SchemeAsMultiScheme(new StringScheme()); > > TransactionalTridentKafkaSpout spout = new > TransactionalTridentKafkaSpout(config); > > TridentTopology topology = new TridentTopology(); > TridentState parallelismHint = topology.newStream("feed", spout) > .shuffle() > .each(new Fields("str"), new TridentFetcherBolt(), new > Fields("textJSON")) > .partitionPersist(KafkaState.transactional("testtopic2", new > KafkaState.Options()), new KafkaStateUpdater("textJSON")) > .parallelismHint(1); > > logger.warn("parallelismHint" + parallelismHint); > > return topology.build(); > } catch (Exception e) { > logger.error("Exception: ", e); > } > return null; > > } > } > > > public class KafkaState implements State { > > private static final Logger logger = Logger.getLogger(KafkaState.class); > > ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>(); > > public static class Options implements Serializable { > > public String zookeeperHost = "127.0.0.1"; > public int zookeeperPort = 2181; > public String serializerClass = "kafka.serializer.StringEncoder"; > public String kafkaConnect = "127.0.0.1:9092"; > > public Options() { > logger.debug("KafkaState::Options()"); > } > > public Options(String zookeeperHost, int zookeeperPort, String > serializerClass, String topicName) { > this.zookeeperHost = zookeeperHost; > this.zookeeperPort = zookeeperPort; > this.serializerClass = serializerClass; > } > } > > public static StateFactory transactional(String topic, Options options) { > logger.debug("KafkaState::transactional: " + topic); > return new Factory(topic, options, true); > } > > public static StateFactory nonTransactional(String topic, Options options) { > return new Factory(topic, options, false); > } > > protected static class Factory implements StateFactory { > > private Options options; > private String topic; > boolean transactional; > > public Factory(String topic, Options options, boolean transactional) { > this.options = options; > this.topic = topic; > this.transactional = transactional; > } > > @Override > public State makeState(Map conf, IMetricsContext metrics, > int partitionIndex, int numPartitions) { > return new KafkaState(topic, options, transactional); > } > > } > > private Options options; > private String topic; > Producer<String, String> producer; > private boolean transactional; > > public KafkaState(String topic, Options options, boolean transactional) { > this.topic = topic; > this.options = options; > this.transactional = transactional; > Properties props = new Properties(); > props.put("zk.connect", options.zookeeperHost + ":" + > Integer.toString(options.zookeeperPort)); > props.put("serializer.class", options.serializerClass); > props.put("metadata.broker.list", options.kafkaConnect); > > ProducerConfig config = new ProducerConfig(props); > producer = new Producer<>(config); > logger.debug("producer initialized successfully." + producer); > } > > @Override > public void beginCommit(Long txid) { > logger.debug("KafkaState::beginCommit"); > if (messages.size() > 0) { > throw new RuntimeException("Kafka State is invalid, the previous > transaction didn't flush"); > } > } > > public void enqueue(String message) { > logger.debug("KafkaState::enqueue ^^^^^^^^^ " + message); > if (transactional) { > messages.add(message); > } else { > sendMessage(message); > } > } > > private void sendMessage(String message) { > KeyedMessage<String, String> data = new KeyedMessage<String, > String>(topic, message); > producer.send(data); > } > > @Override > public void commit(Long txid) { > String message = messages.poll(); > logger.debug("KafkaState::commit @@@@@@@@@@@@@@ " + message); > > while (message != null) { > sendMessage(message); > message = messages.poll(); > } > } > } > > Thanks > > > >
