Hi All,

I am trying to start KafkaStreamer and I really need help.

I used an example http://apacheignite.gridgain.org/docs/getting-started as a
pattern and added only necessary properties. I consider that incorrect
properties are the main problem. But I didn't find any information what
values are correct. When I start KafkaStreamer and send message I see errors
in Ignite logs that properties are not valid and warning about ignoring my
message

Could you please take a look at log errors and my function and advice how I
can fix it.

[15:58:44,569][INFO ][ignite-#57%null%][VerifiableProperties] Verifying
properties
[15:58:44,615][WARN ][ignite-#57%null%][VerifiableProperties] Property
bootstrap.servers is not valid
[15:58:44,615][INFO ][ignite-#57%null%][VerifiableProperties] Property
group.id is overridden to test
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
key.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
key.serializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
value.deserializer is not valid
[15:58:44,616][WARN ][ignite-#57%null%][VerifiableProperties] Property
value.serializer is not valid

[15:59:45,077][WARN ][pool-4-thread-3][root] Message is ignored due to an
error [msg=MessageAndMetadata(test-topic,0,Message(magic = 0, attributes =
0, crc = 1312744161, key = java.nio.HeapByteBuffer[pos=0 lim=4 cap=13],
payload = java.nio.HeapByteBuffer[pos=0 lim=5
cap=5]),94,kafka.serializer.StringDecoder@6079d2fa,kafka.serializer.StringDecoder@6ee18190)]

@Override
    public void execute(ServiceContext ctx) throws Exception {
        KafkaStreamer<String, String, String> kafkaStreamer = new
KafkaStreamer<>();

        try (IgniteDataStreamer<String, String> stmr =
ignite.dataStreamer(stmCache.getName())) {
            // Allow data updates.
            stmr.allowOverwrite(true);

            kafkaStreamer.setIgnite(ignite);
            kafkaStreamer.setStreamer(stmr);

            // set the topic
            kafkaStreamer.setTopic("test-topic");

            // set the number of threads to process Kafka streams
            kafkaStreamer.setThreads(4);

            // set Kafka consumer configurations
            Properties props = new Properties();
            props.put("key.serializer",
                   
"org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
                   
"org.apache.kafka.common.serialization.StringSerializer");

            props.put("bootstrap.servers", "localhost:9092");
            props.put("zookeeper.connect", "localhost:2181");
            props.put("group.id", "test");
            props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

            ConsumerConfig config = new ConsumerConfig(props);
            kafkaStreamer.setConsumerConfig(config);

            // set decoders
            StringDecoder keyDecoder = new StringDecoder(null);
            StringDecoder valueDecoder = new StringDecoder(null);
            kafkaStreamer.setKeyDecoder(keyDecoder);
            kafkaStreamer.setValueDecoder(valueDecoder);

            kafkaStreamer.start();
            System.out.println("Kafka streamer started!");
        }
    }




--
View this message in context: 
http://apache-ignite-users.70518.x6.nabble.com/Need-help-with-Ignite-KafkaStreamer-tp5186.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Reply via email to