I have just did wikifeed example and given the output of wikifeed example to another topologyProcessor to find the even numbers.while testing for multiple Consumers in three JVM, the output topic is revoked and rebalanced across three JVMs. i have got 10 tasks (max no of partitions).
i have three topics: wikifeedInputtopic1 - 10 partitions wikifeedOutputtopic1 - 10 partitions sumoutputeventopicC1 - 5 partitions I have tried to Spread the task across multiple JVM, in JVM1:First i have this much partitions [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 1_4, 0_5, 1_5, 0_6, 1_6, 0_7, 1_7, 0_8, 1_8, 0_9, 1_9] then i started with second JVM i have got JVM1: current active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 0_3, 1_3, 0_4, 1_4] current standby tasks: [] previous active tasks: [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 1_4, 0_5, 1_5, 0_6, 1_6, 0_7, 1_7, 0_8, 1_8, 0_9, 1_9] (org.apache.kafka.streams.processor.internals.StreamThread) JVM2: current active tasks: [0_5, 1_5, 0_6, 1_6, 0_7, 1_7, 0_8, 1_8, 0_9, 1_9] current standby tasks: [] previous active tasks: [] while i started third JVM : JVM1: current active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 1_9] current standby tasks: [] previous active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 0_3, 1_3, 0_4, 1_4] JVM2: current active tasks: [0_5, 1_5, 0_6, 1_6, 0_7, 0_8, 1_8] current standby tasks: [] previous active tasks: [0_5, 0_6, 1_5, 0_7, 1_6, 0_8, 1_7, 0_9, 1_8, 1_9] JVM3: current active tasks: [0_3, 1_3, 0_4, 1_4, 1_7, 0_9] current standby tasks: [] previous active tasks: [] i have aslo updated the statedirectory while starting three JVMs but i have not got the latest task list in statedirectory: FirstJVM stateDirectory:: [admin@nms-181 WikiFeedLambdaexampleC2]$ ls 0_0 0_1 0_2 0_3 0_4 0_5 0_6 0_7 0_8 0_9 1_0 1_1 1_2 1_3 1_4 1_5 1_6 1_7 1_8 1_9 SecondJVM stateDirectory: [admin@nms-181 WikiFeedLambdaexampleC2]$ ls 0_5 0_6 0_7 0_8 0_9 1_5 1_6 1_7 1_8 1_9 ThirdJVM stateDirectory: [admin@nms-181 WikiFeedLambdaexampleC2]$ ls 0_3 0_4 0_9 1_3 1_4 1_7 Doubts: 1.while runnning in multiple JVM with multiple Consumers ,the task also gets spread across multiple JVM?? 2.why third stateDirectory has the lastest task list,others dont have the latest taskList ??? i have attached the my codes below:
package kafka.examples.wikifeed; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; /** * Created by PravinKumar on 29/7/17. */ public class CustomDeserializer<T> implements Deserializer<T> { @Override public void configure(Map map, boolean b) { } @Override public T deserialize(String s, byte[] bytes) { return (T) SerializationUtils.deserialize(bytes); } @Override public void close() { } }
package kafka.examples.wikifeed; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.serialization.Serializer; import java.io.Serializable; import java.util.Map; /** * Created by PravinKumar on 29/7/17. */ public class CustomSerializer<T> implements Serializer<T> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, T t) { return SerializationUtils.serialize((Serializable) t); } @Override public void close() { } }
package kafka.examples.wikifeed; import java.io.Serializable; /** * Created by PravinKumar on 29/7/17. */ public class Wikifeed implements Serializable { private String name; private boolean isNew; private String content; public Wikifeed(String name, boolean isNew, String content) { this.name = name; this.isNew = isNew; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isNew() { return isNew; } public void setNew(boolean aNew) { isNew = aNew; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 29/7/17. */ public class wikifeedDriverExample { final static String BOOTSTRAP_SERVERS="localhost:9092"; final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1"; public static void main(String[] args) { ProducerInput(); ConsumerOutput(); } public static void ProducerInput(){ String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"}; Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass()); KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties); Random random=new Random(); IntStream.range(0,random.nextInt(100000)) .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content")) .forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record))); producer.flush(); } public static void ConsumerOutput() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer()); consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT)); while (true) { consumer.poll(100) .forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value())); } } }
package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import java.util.Properties; /** * Created by PravinKumar on 29/7/17. */ public class WikifeedLambdaexample { final static String WIKIFEED_INPUT="wikifeedInputtopic1"; final static String WIKIFEED_OUTPUT="wikifeedOutputtopic1"; final static String WIKIFEED_LAMBDA="WikiFeedLambdaexampleC2"; final static String BOOTSTRAP_SERVER="localhost:9092"; final static String COUNT_STORE="countstore1"; final static String STAT_DIR="/home/admin/Desktop/kafka_2.12.1.0.0/kafka-streams"; final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicC1"; final static String EVEN_TABLE = "sumy1"; public static void main(String[] args) { KafkaStreams WikifeedKStreams= getWikifeed(); WikifeedKStreams.cleanUp(); WikifeedKStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close)); KafkaStreams sumStreams= getEvenNumSum(); sumStreams.cleanUp(); sumStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close)); } public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT); KTable<String,Long> kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } public static KafkaStreams getEvenNumSum(){ Properties props=new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream<String,Long> suminput=builder.stream(WIKIFEED_OUTPUT); getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC); KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props); return kafkaStreams; } private static KTable getKTableForEvenNums(KStream<String,Long> sumeveninput){ KTable<String, Long> evenKTable=sumeveninput .filter((key,value)-> value%2 ==0) .groupByKey() .reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE)); return evenKTable; } }
package kafka.examples.wikifeed; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.io.Serializable; import java.util.Map; /** * Created by PravinKumar on 29/7/17. */ public class WikifeedSerde<T extends Serializable> implements Serde<T>{ Serializer<T> serializer; Deserializer<T> deserializer; public static WikifeedSerde getInstance(){ return new WikifeedSerde(); } public WikifeedSerde(){ serializer=new CustomSerializer(); deserializer=new CustomDeserializer(); } @Override public void configure(Map<String, ?> map, boolean b) { serializer.configure(map, b); deserializer.configure(map, b); } @Override public void close() { serializer.close(); deserializer.close(); } @Override public Serializer<T> serializer() { return serializer; } @Override public Deserializer<T> deserializer() { return deserializer; } }