Hi All, I am trying to start KafkaStreamer and I really need help.
I used an example http://apacheignite.gridgain.org/docs/getting-started as a pattern and added only necessary properties. I consider that incorrect properties are the main problem. But I didn't find any information what values are correct. When I start KafkaStreamer and send message I see errors in Ignite logs that properties are not valid and warning about ignoring my message Could you please take a look at log errors and my function and advice how I can fix it. [15:58:44,569][INFO ][ignite-#57%null%][VerifiableProperties] Verifying properties [15:58:44,615][WARN ][ignite-#57%null%][VerifiableProperties] Property bootstrap.servers is not valid [15:58:44,615][INFO ][ignite-#57%null%][VerifiableProperties] Property group.id is overridden to test [15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property key.deserializer is not valid [15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property key.serializer is not valid [15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property value.deserializer is not valid [15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property value.serializer is not valid [15:59:45,077][WARN ][pool-4-thread-3][root] Message is ignored due to an error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes = 0, crc = 1312744161, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=13], payload = java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]),94,kafka.serializer.StringDecoder@6079d2fa,kafka.serializer.StringDecoder@6ee18190)] @Override public void execute(ServiceContext ctx) throws Exception { KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>(); try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(stmCache.getName())) { // Allow data updates. stmr.allowOverwrite(true); kafkaStreamer.setIgnite(ignite); kafkaStreamer.setStreamer(stmr); // set the topic kafkaStreamer.setTopic("test-topic"); // set the number of threads to process Kafka streams kafkaStreamer.setThreads(4); // set Kafka consumer configurations Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ConsumerConfig config = new ConsumerConfig(props); kafkaStreamer.setConsumerConfig(config); // set decoders StringDecoder keyDecoder = new StringDecoder(null); StringDecoder valueDecoder = new StringDecoder(null); kafkaStreamer.setKeyDecoder(keyDecoder); kafkaStreamer.setValueDecoder(valueDecoder); kafkaStreamer.start(); System.out.println("Kafka streamer started!"); } } -- View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186.html Sent from the Apache Ignite Users mailing list archive at Nabble.com.
