I have just did wikifeed example and given the output of wikifeed example
to another topologyProcessor to find the even numbers.while testing for
multiple Consumers in three JVM, the output topic is revoked and rebalanced
across three JVMs. i have got 10 tasks (max no of partitions).

i have three topics:

wikifeedInputtopic1 - 10 partitions
wikifeedOutputtopic1 - 10 partitions
sumoutputeventopicC1 - 5 partitions

I have tried to Spread the task across multiple JVM,

in JVM1:First i have this much partitions
[0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3, 1_4, 0_5, 1_5, 0_6, 1_6, 0_7,
1_7, 0_8, 1_8, 0_9, 1_9]

then i started with second JVM i have got
JVM1:
current active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 0_3, 1_3, 0_4, 1_4]
    current standby tasks: []
    previous active tasks: [0_0, 0_1, 1_0, 0_2, 1_1, 0_3, 1_2, 0_4, 1_3,
1_4, 0_5, 1_5, 0_6, 1_6, 0_7, 1_7, 0_8, 1_8, 0_9, 1_9]
 (org.apache.kafka.streams.processor.internals.StreamThread)

JVM2:
current active tasks: [0_5, 1_5, 0_6, 1_6, 0_7, 1_7, 0_8, 1_8, 0_9, 1_9]
    current standby tasks: []
    previous active tasks: []

while i started third JVM :
JVM1:
current active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 1_9]
    current standby tasks: []
    previous active tasks: [0_0, 1_0, 0_1, 1_1, 0_2, 1_2, 0_3, 1_3, 0_4,
1_4]

JVM2:
    current active tasks: [0_5, 1_5, 0_6, 1_6, 0_7, 0_8, 1_8]
    current standby tasks: []
    previous active tasks: [0_5, 0_6, 1_5, 0_7, 1_6, 0_8, 1_7, 0_9, 1_8,
1_9]

JVM3:

current active tasks: [0_3, 1_3, 0_4, 1_4, 1_7, 0_9]
    current standby tasks: []
    previous active tasks: []

i have aslo updated the statedirectory while starting three JVMs
but i have not got the latest task list in statedirectory:

FirstJVM stateDirectory::
[admin@nms-181 WikiFeedLambdaexampleC2]$ ls
0_0  0_1  0_2  0_3  0_4  0_5  0_6  0_7  0_8  0_9  1_0  1_1  1_2  1_3  1_4
1_5  1_6  1_7  1_8  1_9

SecondJVM stateDirectory:
[admin@nms-181 WikiFeedLambdaexampleC2]$ ls
0_5  0_6  0_7  0_8  0_9  1_5  1_6  1_7  1_8  1_9

ThirdJVM stateDirectory:
[admin@nms-181 WikiFeedLambdaexampleC2]$ ls
0_3  0_4  0_9  1_3  1_4  1_7

Doubts:
1.while runnning in multiple JVM with multiple Consumers ,the task also
gets spread across multiple JVM??

2.why  third stateDirectory has the lastest task list,others dont have the
latest taskList ???


i have attached the my codes below:
package kafka.examples.wikifeed;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class CustomDeserializer<T> implements Deserializer<T> {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return (T) SerializationUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }
}
package kafka.examples.wikifeed;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable;
import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class CustomSerializer<T> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, T t) {
        return SerializationUtils.serialize((Serializable) t);
    }

    @Override
    public void close() {

    }
}
package kafka.examples.wikifeed;

import java.io.Serializable;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class Wikifeed implements Serializable {

    private String name;
    private boolean isNew;
    private String content;

    public Wikifeed(String name, boolean isNew, String content) {
        this.name = name;
        this.isNew = isNew;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isNew() {
        return isNew;
    }

    public void setNew(boolean aNew) {
        isNew = aNew;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

    final static String BOOTSTRAP_SERVERS="localhost:9092";
    final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";

    public static void main(String[] args) {
        ProducerInput();
        ConsumerOutput();
    }

    public static void ProducerInput(){
        String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
                "lauren", "joseph"};

        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

        KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
        Random random=new Random();
        IntStream.range(0,random.nextInt(100000))
                .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
                .forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
        producer.flush();
    }

    public static void ConsumerOutput() {

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
       //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

        KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer());
        consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));

        while (true) {
            consumer.poll(100)
                    .forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value()));
        }
    }
}
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {

    final static String WIKIFEED_INPUT="wikifeedInputtopic1";
    final static String WIKIFEED_OUTPUT="wikifeedOutputtopic1";
    final static String WIKIFEED_LAMBDA="WikiFeedLambdaexampleC2";
    final static String BOOTSTRAP_SERVER="localhost:9092";
    final static String COUNT_STORE="countstore1";
    final static String STAT_DIR="/home/admin/Desktop/kafka_2.12.1.0.0/kafka-streams";
    final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicC1";
    final static String EVEN_TABLE = "sumy1";

    public static void main(String[] args) {
        KafkaStreams WikifeedKStreams= getWikifeed();
        WikifeedKStreams.cleanUp();
        WikifeedKStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close));

        KafkaStreams sumStreams= getEvenNumSum();
        sumStreams.cleanUp();
        sumStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));
    }

    public static KafkaStreams getWikifeed(){

        Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
        properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
        KTable<String,Long> kTable=inputStream
                .filter((key, value) -> value.isNew())
                .map(((key, value) -> KeyValue.pair(value.getName(),value)))
                .groupByKey()
                .count(Materialized.as(COUNT_STORE));
        kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
        KafkaStreams streams= new KafkaStreams(builder.build(),properties);

        return streams;
    }

    public static KafkaStreams getEvenNumSum(){

        Properties props=new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Long> suminput=builder.stream(WIKIFEED_OUTPUT);
        getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC);

        KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props);

        return kafkaStreams;
    }

    private static KTable getKTableForEvenNums(KStream<String,Long> sumeveninput){
        KTable<String, Long> evenKTable=sumeveninput
                .filter((key,value)-> value%2 ==0)
                .groupByKey()
                .reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE));
        return evenKTable;
    }
}
package kafka.examples.wikifeed;


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable;
import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedSerde<T extends Serializable> implements Serde<T>{

    Serializer<T> serializer;
    Deserializer<T> deserializer;

    public static WikifeedSerde getInstance(){
        return new WikifeedSerde();
    }
    public WikifeedSerde(){
        serializer=new CustomSerializer();
        deserializer=new CustomDeserializer();
    }
    @Override
    public void configure(Map<String, ?> map, boolean b) {
        serializer.configure(map, b);
        deserializer.configure(map, b);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

Reply via email to