I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions
i have produced 10 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 10
records.
can someone explain how this happens??
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400
here is my processorTopology code:
//
public static KafkaStreams getWikifeed(){
Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);
return streams;
}
->
My driver code is in the attachment file.
package kafka.examples.wikifeed;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;
/**
* Created by PravinKumar on 29/7/17.
*/
public class wikifeedDriverExample {
final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";
public static void main(String[] args) {
ProducerInput();
ConsumerOutput();
}
public static void ProducerInput(){
String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());
KafkaProducer producer=new KafkaProducer(properties);
Random random=new Random();
IntStream.range(0,random.nextInt(10))
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}
public static void ConsumerOutput() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");
KafkaConsumer consumer = new KafkaConsumer(properties, new StringDeserializer(), new LongDeserializer());
consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));
while (true) {
consumer.poll(100)