Here is some code that I used. I made the Kafka Ignite Streamer as a service
in Ignite and deploy it as a cluster singleton.
public class IgniteKafkaStreamerService implements Service {
private static final long serialVersionUID = 1L;
@IgniteInstanceResource
private Ignite ignite;
private KafkaStreamer<String, String> kafkaStreamer = new
KafkaStreamer<>();
private IgniteLogger logger;
@Override
public void init(ServiceContext ctx) throws Exception {
logger = ignite.log();
IgniteDataStreamer<String, String> stmr =
ignite.dataStreamer(CACHE_NAME);
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(1000);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
kafkaStreamer.setThreads(4);
kafkaStreamer.setTopic(KAFKA_TOPIC);
Properties kafkaProps =
Util.loadProperties("config/kafka.properties"); //
Some code to read in the kafka properties
kafkaStreamer.setConsumerConfig(new ConsumerConfig(kafkaProps));
kafkaStreamer.setSingleTupleExtractor(msg -> new
AbstractMap.SimpleEntry<String, String>(new String(msg.key()), new
String(msg.message())));
}
@Override
public void execute(ServiceContext ctx) throws Exception {
kafkaStreamer.start();
logger.info("KafkaStreamer started.");
}
@Override
public void cancel(ServiceContext ctx) {
kafkaStreamer.stop();
logger.info("KafkaStreamer stopped.");
}
}
Below the code to startup:
public class IgniteNodeStartup {
public static void main(String[] args) {
// Use to start up an Ignite server with default configuration
Ignite ignite = Ignition.start();
ignite.getOrCreateCache(getCacheConfiguration());
// Deploy data streamer service on the server nodes.
ClusterGroup forServers = ignite.cluster().forServers();
ignite.services(forServers).deployClusterSingleton("KafkaService", new
IgniteKafkaStreamerService());
}
private static CacheConfiguration<String, String>
getCacheConfiguration() {
CacheConfiguration<String, String> cfg = new
CacheConfiguration<String,
String>();
cfg.setName(CACHE_NAME);
cfg.setBackups(1);
return cfg;
}
}
I hope this helps.
Humphrey
--
View this message in context:
http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-tp12649p12836.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.