Fwd: Consumer slowness issue

2018-03-27 Thread pravin kumar
-- Forwarded message --
From: "pravin kumar" <pk007...@gmail.com>
Date: 27-Mar-2018 6:11 PM
Subject: Consumer slowness issue
To: <users@kafka.apache.org>
Cc:

i have two topics with 5 partitions each

wikifeedInputT10

KafkaProducer produces 10 elements and wikifeedInputT10 have
received these elements.
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputT10 --time -1

wikifeedInputT10:2:2
wikifeedInputT10:4:2
wikifeedInputT10:1:2
wikifeedInputT10:3:2
wikifeedInputT10:0:2

but after processing reading from my outputTopic: wikifeedOutputT15
i have received


[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedOutputT15 --time -1
wikifeedOutputT15:2:1
wikifeedOutputT15:4:1
wikifeedOutputT15:1:3
wikifeedOutputT15:3:3
wikifeedOutputT15:0:3

I have received the output in my console as

[2018-03-27 17:55:32,359] INFO Kafka version : 1.0.1
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,359] INFO Kafka commitId : c0518aa65f25317e
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,600] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Discovered group coordinator
nms-181.nmsworks.co.in:9092 (id: 2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Revoking previously assigned
partitions [] (org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,610] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Successfully joined group with
generation 5 (org.apache.kafka.clients.consumer.internals.
AbstractCoordinator)
[2018-03-27 17:55:32,611] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Setting newly assigned partitions
[wikifeedOutputT15-2, wikifeedOutputT15-1, wikifeedOutputT15-0,
wikifeedOutputT15-4, wikifeedOutputT15-3]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Topic :::wikifeedOutputT15 Partition:0 Keyjoe  =  Value::
18263
Topic :::wikifeedOutputT15 Partition:0 Keyphil  =  Value::
18230
Topic :::wikifeedOutputT15 Partition:0 Keytania  =
Value:: 18344
Topic :::wikifeedOutputT15 Partition:1 Keypravin  =
Value:: 18140
Topic :::wikifeedOutputT15 Partition:1 Keykumar  =
Value:: 18248
Topic :::wikifeedOutputT15 Partition:1 Keyjoseph  =
Value:: 18116
Topic :::wikifeedOutputT15 Partition:2 Keylauren  =
Value:: 18150
Topic :::wikifeedOutputT15 Partition:3 Keybob  =  Value::
18131
Topic :::wikifeedOutputT15 Partition:3 Keyerica  =
Value:: 18084
Topic :::wikifeedOutputT15 Partition:3 Keydamian  =
Value:: 18126
Topic :::wikifeedOutputT15 Partition:4 Keysam  =  Value::
18168


it stops here and im not getting any msgs

i have attached my code below
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {

final static String WIKIFEED_INPUT="wikifeedInputT10";
final static String WIKIFEED_OUTPUT="wikifeedOutputT15";
final static String WIKIFEED_LAMBDA="WikiFeedLambdaT10";
//final static String SUM_LAMBDA="sumlambda10";
final static String BOOTSTRAP_SERVER="localhost:9092";
final static String COUNT_STORE="countstoreT10";
final static String STAT_DIR="/home/admin/Document/kafka_2.11.1.0.1/kafka-streams";
//final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicT10";
   // final static String EVEN_TABLE = "sumDemo10";

public static void main(String[] args) {

ProducerInput();
KafkaStreams WikifeedKStreams= getWikifeed();
Wikif

Consumer slowness issue

2018-03-27 Thread pravin kumar
ms.cleanUp();
sumStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));*/
}

public static void ProducerInput(){
String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};

Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
Random random=new Random();
IntStream.range(0,10)
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}


public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
KTable<String,Long> kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}

/*public static KafkaStreams getEvenNumSum(){

Properties props=new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, SUM_LAMBDA);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream<String,Long> suminput=builder.stream(WIKIFEED_OUTPUT);
getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC);

KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props);

return kafkaStreams;
}*/

   /* private static KTable getKTableForEvenNums(KStream<String,Long> sumeveninput){
KTable<String, Long> evenKTable=sumeveninput
.filter((key,value)-> value%2 ==0)
.groupByKey()
.reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE));
return evenKTable;
}*/
}
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda4";

public static void main(String[] args) {
ConsumerOutput();
}

public static void ConsumerOutput() {

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, WikifeedLambdaexample.BOOTSTRAP_SERVER);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  // properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
  properties.put(ConsumerConfig.CLIENT_ID_CONFI

Producing more number of Records than expected

2018-03-06 Thread pravin kumar
I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 10 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 10
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//

public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
KTable<String,Long> kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}

->


My driver code is in the attachment file.
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";

public static void main(String[] args) {
ProducerInput();
ConsumerOutput();
}

public static void ProducerInput(){
String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};

Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
Random random=new Random();
IntStream.range(0,random.nextInt(10))
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}

public static void ConsumerOutput() {

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
  //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

KafkaConsumer<String, Long> consumer = new KafkaConsumer<String

Producing more number of Records than expected

2018-03-06 Thread pravin kumar
I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 10 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 10
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//

public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}

->


My driver code is in the attachment file.


Tasks across MultipleJVM

2018-02-28 Thread pravin kumar
 class wikifeedDriverExample {

final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";

public static void main(String[] args) {
ProducerInput();
ConsumerOutput();
}

public static void ProducerInput(){
    String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};

Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
Random random=new Random();
IntStream.range(0,random.nextInt(10))
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}

public static void ConsumerOutput() {

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
   //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer());
consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));

while (true) {
consumer.poll(100)
.forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::" +consumerRecord.topic() + " " + "Partition:" + consumerRecord.partition()+ " " + "Key" +consumerRecord.key()+ " " + " = " + " Value:: " +consumerRecord.value()));
}
}
}
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {

final static String WIKIFEED_INPUT="wikifeedInputtopic1";
final static String WIKIFEED_OUTPUT="wikifeedOutputtopic1";
final static String WIKIFEED_LAMBDA="WikiFeedLambdaexampleC2";
final static String BOOTSTRAP_SERVER="localhost:9092";
final static String COUNT_STORE="countstore1";
final static String STAT_DIR="/home/admin/Desktop/kafka_2.12.1.0.0/kafka-streams";
final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicC1";
final static String EVEN_TABLE = "sumy1";

public static void main(String[] args) {
KafkaStreams WikifeedKStreams= getWikifeed();
WikifeedKStreams.cleanUp();
WikifeedKStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close));

KafkaStreams sumStreams= getEvenNumSum();
sumStreams.cleanUp();
sumStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));
}

public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earlies

ProcessorTopology

2018-02-22 Thread pravin kumar
Can we give the ouput of one processorTopology as the input to another
processorTopology.

if it is possible,how can we do it.
Can anyone provide it with any example


usage of depricated method in kafka 2_12.1.0.0

2018-02-21 Thread pravin kumar
i have tried wikifeed example with Kafka 2_12.1.0.0.the count method is now
depricated ,

previously in kafka_2.11-0.10.2.1 i have given count(localStateStoreName).

how to give the statestore name in Kafka 2_12.1.0.0.

i have attached the code below,
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {
final static String WIKIFEED_INPUT="wikifeedInput";
final static String WIKIFEED_OUTPUT="wikifeedOutput";
final static String WIKIFEED_LAMBDA="WikiFeedLambda";
final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String COUNT_STORE="countstore";
final static String STAT_DIR="/home/admin/Documents/kafka_2.12.1.0.0/kafka-streams";

public static void main(String[] args) {
KafkaStreams kafkaStreams=getWikifeedStreams();
kafkaStreams.cleanUp();
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}

public static KafkaStreams getWikifeedStreams(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(COUNT_STORE);
kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}
}


Doubts about multiple instance in kafka

2018-02-21 Thread pravin kumar
I have the Kafka confluent Document.

But i cant understand the following line.

"It is important to understand that Kafka Streams is not a resource
manager, but a library that “runs” anywhere its stream processing
application runs. Multiple instances of the application are executed either
on the same machine, or spread across multiple machines and tasks can
be distributed
automatically by the library
<https://docs.confluent.io/current/streams/architecture.html#streams-architecture-threads>
to those running application instances"

i have tried to run on same machine with multiple JVM with multiple
consumers.

is it correct way to run on same machine using multiple consumers??
or is there any other way??
i have attached the code below
package kafka.examples.MultiConsumerMultipartition.taskConfig;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 23/10/17.
 */
public class MultiPartitionMultiConsumerDriver {

public static final String CONSUMER_GROUP_ID = "multipartitionmulticonsumerdriver2";
private static final int MAX_RECORDS=1;
public static void main(String[] args) throws InterruptedException {
produceInput();
consumerOutput();
}

public static Properties getConsumerProps() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass().getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

return properties;
}

public static void produceInput(){
Random random=new Random();
String[] msg={"hi","my","name","is","pravin","kumar","studied","in","madras","institute","of","technology"
,"hi","my","name","is","pravin","kumar","studied","in","good","shepherd","school","properties","put"
,"ConsumerConfig","BOOTSTRAP","SERVERS","CONFIG","Single","Partition","MultiConsumer","UsingStream"
, "BOOTSTRAP","SERVER","properties","put","StreamsConfig","DEFAULT","KEY","SERDE","CLASS","CONFIG"
,"Serdes","String","getClass","getName"};
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MultiPartitionMultiConsumerUsingStream.BOOTSTRAP_SERVER);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serdes.String().serializer().getClass().getName());
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(producerProps);
IntStream.range(0,MAX_RECORDS)
.forEach(record ->producer.send(new ProducerRecord<String, String>
(MultiPartitionMultiConsumerUsingStream.INPUT_TOPIC,null,msg[random.nextInt(msg.length)])));//msg[random.nextInt(msg.length)]

producer.flush();
}

public static void consumerOutput() throws InterruptedException {
Properties consumerProps = getConsumerProps();
KafkaConsumer<String,Long> consumer = new KafkaConsumer<String, Long>(consumerProps);
consumer.subscribe(Collections.singleton(MultiPartitionMultiConsumerUsingStream.OUTPUT_TOPIC));
while (true) {
Thread.sleep(5_000);
consumer.poll(Long.MAX_VALUE).forEach(ConsumerRecord ->

Doubts in KStreams

2018-02-21 Thread pravin kumar
I have studied KafkaStreams, but not clearly understood

1.Can someone explain about Fault tolerence.
2.I have topicA and topicB with 4 partitions, so it created fourTasks, I
have created it in singleJVM.But i need to knw how it works in multiple JVM
and if one jvm goes down,how it another jvm takes the responsibility and
how the localStateStore is recreated in the JVM which takes responsibility.


WordCount Example using GlobalKStore

2017-11-01 Thread pravin kumar
i have created 3 inputtopics with 10 partitions each and output Topic with
10 partitions

I did wordcount example  and stored it in GlobalKTable.
i initally stored counted value  in LocalStateStore and then it to
GlobalStateStore.

i have atteated the code here:
https://gist.github.com/Pk007790/d46236b1b5c394301f27b96891a94584

and i have supplied the inputs to the producers like this
:https://gist.github.com/Pk007790/ba934b7bcea42b8b05f4816de3cb84a0

my ques is:how to store the processed information in GlobalStateStore
without localStateStore


GlobalKStore

2017-11-01 Thread pravin kumar
i have created 3 topics with 10 partitions each

i have intended to store processed information in globalKtable

now i have did with individual Ktable to Output topic then to GlobalKtable

#//

KStreamBuilder builder=new KStreamBuilder();
KStream inputStream =
builder.stream(INPUT_TOPICA,INPUT_TOPICB,INPUT_TOPICC)
.map(((key, value) -> new KeyValue<>(value, value)))
.groupByKey()
.count(INPUT_TABLE)
.toStream();
inputStream.to(Serdes.String(),Serdes.Long(),OUTPUT_TOPIC);
GlobalKTable objectObjectGlobalKTable =
builder.globalTable(OUTPUT_TOPIC);
KafkaStreams kafkaStreams=new KafkaStreams(builder,props);

//#


how to do it without localStateStore??


Re: regarding number of Stream Tasks

2017-10-31 Thread pravin kumar
ohhh...thank you. Its cleared now

On Tue, Oct 31, 2017 at 4:36 PM, Damian Guy <damian@gmail.com> wrote:

> Hi, the `map` when it is followed by `groupByKey` will cause a
> repartitioning of the data, so you will have your 5 tasks processing the
> input partitions and 5 tasks processing the partitions from the
> repartitioning.
>
> On Tue, 31 Oct 2017 at 10:56 pravin kumar <pk007...@gmail.com> wrote:
>
> >  I have created a stream with topic contains 5 partitions and expected to
> > create 5 stream tasks ,i got 10 tasks as
> > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> >
> >
> > im doing wordcount in this example,
> >
> > here is my topology in this link: 1.
> > https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725
> > 2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6.
> >
> > On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy <damian@gmail.com>
> wrote:
> >
> > > It would depend on what your topology looks like, which you haven't
> show
> > > here. But if there may be internal topics generated due to
> repartitioning
> > > which would cause the extra tasks.
> > > If you provide the topology we would be able to tell you.
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk007...@gmail.com> wrote:
> > >
> > > > I have created a stream with topic contains 5 partitions and expected
> > to
> > > > create 5 stream tasks ,i got 10 tasks as
> > > > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> > > >
> > > >
> > > > my doubt is:im expected to have 5 tasks how it produced 10 tasks
> > > >
> > > > here are some logs:
> > > > [2017-10-24 10:27:35,284] INFO Kafka commitId :
> > > > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> > > > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> > > > (org.apache.kafka.clients.consumer.KafkaConsumer)
> > > > [2017-10-24 10:27:35,304] INFO stream-thread
> > > >
> > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > > 9d8f-a1a9a8adfb7d-StreamThread-1]
> > > > State transition from CREATED to RUNNING.
> > > > (org.apache.kafka.streams.processor.internals.StreamThread)
> > > > [2017-10-24 10:27:35,306] DEBUG stream-client
> > > >
> > > > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> > > 9d8f-a1a9a8adfb7d]
> > > > Removing local Kafka Streams application data in
> > > >
> > > > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/
> > > SingleConsumerMultiConsumerUsingStreamx4
> > > > for application SingleConsumerMultiConsumerUsingStreamx4.
> > > > (org.apache.kafka.streams.KafkaStreams)
> > > > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 0_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 1_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting
> > obsolete
> > > > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released
> state
> > > dir
> > > > lock for task 1_0
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired
> state
> > > dir
> > > > lock for task 0_1
> > > > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > > > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting

Re: regarding number of Stream Tasks

2017-10-31 Thread pravin kumar
 I have created a stream with topic contains 5 partitions and expected to
create 5 stream tasks ,i got 10 tasks as
0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4


im doing wordcount in this example,

here is my topology in this link: 1.
https://gist.github.com/Pk007790/72b0718f26e6963246e83da992b3e725
2.https://gist.github.com/Pk007790/a05226007ca90cdd36c362d09d19bda6.

On Tue, Oct 24, 2017 at 3:29 PM, Damian Guy <damian@gmail.com> wrote:

> It would depend on what your topology looks like, which you haven't show
> here. But if there may be internal topics generated due to repartitioning
> which would cause the extra tasks.
> If you provide the topology we would be able to tell you.
> Thanks,
> Damian
>
> On Tue, 24 Oct 2017 at 10:14 pravin kumar <pk007...@gmail.com> wrote:
>
> > I have created a stream with topic contains 5 partitions and expected to
> > create 5 stream tasks ,i got 10 tasks as
> > 0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
> >
> >
> > my doubt is:im expected to have 5 tasks how it produced 10 tasks
> >
> > here are some logs:
> > [2017-10-24 10:27:35,284] INFO Kafka commitId :
> > cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
> > [2017-10-24 10:27:35,284] DEBUG Kafka consumer created
> > (org.apache.kafka.clients.consumer.KafkaConsumer)
> > [2017-10-24 10:27:35,304] INFO stream-thread
> >
> > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> 9d8f-a1a9a8adfb7d-StreamThread-1]
> > State transition from CREATED to RUNNING.
> > (org.apache.kafka.streams.processor.internals.StreamThread)
> > [2017-10-24 10:27:35,306] DEBUG stream-client
> >
> > [SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-
> 9d8f-a1a9a8adfb7d]
> > Removing local Kafka Streams application data in
> >
> > /home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/
> SingleConsumerMultiConsumerUsingStreamx4
> > for application SingleConsumerMultiConsumerUsingStreamx4.
> > (org.apache.kafka.streams.KafkaStreams)
> > [2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_0
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 0_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 1_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state
> dir
> > lock for task 1_1
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state
> dir
> > lock for task 0_2
> > (org.apache.kafka.streams.processor.internals.StateDirectory)
> > [2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete
> > state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed

regarding number of Stream Tasks

2017-10-24 Thread pravin kumar
I have created a stream with topic contains 5 partitions and expected to
create 5 stream tasks ,i got 10 tasks as
0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4


my doubt is:im expected to have 5 tasks how it produced 10 tasks

here are some logs:
[2017-10-24 10:27:35,284] INFO Kafka commitId :
cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser)
[2017-10-24 10:27:35,284] DEBUG Kafka consumer created
(org.apache.kafka.clients.consumer.KafkaConsumer)
[2017-10-24 10:27:35,304] INFO stream-thread
[SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d-StreamThread-1]
State transition from CREATED to RUNNING.
(org.apache.kafka.streams.processor.internals.StreamThread)
[2017-10-24 10:27:35,306] DEBUG stream-client
[SingleConsumerMultiConsumerUsingStreamx4-4dc5b303-62b4-4898-9d8f-a1a9a8adfb7d]
Removing local Kafka Streams application data in
/home/admin/Documents/kafka_2.11-0.10.2.1/kafka-streams/SingleConsumerMultiConsumerUsingStreamx4
for application SingleConsumerMultiConsumerUsingStreamx4.
(org.apache.kafka.streams.KafkaStreams)
[2017-10-24 10:27:35,311] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,311] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_0 for task 0_0 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,322] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_0 for task 1_0 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_0
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_1 for task 0_1 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,395] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_1 for task 1_1 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,396] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_2 for task 0_2 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,397] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_2 for task 1_2 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
lock for task 1_2
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 0_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] INFO stream-thread [cleanup] Deleting obsolete
state directory 0_3 for task 0_3 as cleanup delay of 0 ms has passed
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Released state dir
lock for task 0_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,398] DEBUG stream-thread [cleanup] Acquired state dir
lock for task 1_3
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2017-10-24 10:27:35,399] INFO stream-thread [cleanup] Deleting obsolete
state directory 1_3 for task 1_3 as cleanup delay of 0 ms has passed

StreamTasks

2017-10-24 Thread pravin kumar
I have created a stream with topic contains 5 partitions and expected to
create 5 stream tasks ,i got
 [admin@nms-181  ]$ ls
0_0  0_1  0_2  0_3  0_4  1_0  1_1  1_2  1_3  1_4
SingleConsumerMultiConsumerUsingStreamx4 is my application_id and this
1_0,1_... contains localStateStore
and 0_0,0_ contains nothing,its empty.

My doubt is: 1)what is the difference between 0_... and 1_...
   2)why 0_... is created as empty


Re: reg Kafka Node num

2017-10-23 Thread pravin kumar
i have one doubt in ur answer: why should we differentiate the kafka logs
trafiic from controller node with the normal trafiic from the same node??


On Sat, Oct 21, 2017 at 1:52 AM, Eric Azama <eazama...@gmail.com> wrote:

> Kafka logs traffic to the Controller node separately from the normal
> traffic to the node. In order to differentiate it subtracts the broker id
> from 2147483647 (max int) and uses the result as the "node id" for the
> controller.
>
> On a related note, logs and metrics related to the bootstrap process seems
> to be logged with negative integers. -1 for the first bootstrap server, -2
> for the second and so on.
>
>
>
> ------ Forwarded message --
> From: pravin kumar <pk007...@gmail.com>
> To: users@kafka.apache.org
> Cc:
> Bcc:
> Date: Fri, 20 Oct 2017 14:44:04 +0530
> Subject: reg Kafka Node num
> I have run a Multiple Partition and Multiple Consumer application.Then i
> have altered the no of Partitions,
>
> in log im getting
>
> Using older server API v0 to send HEARTBEAT
> {group_id=
> altermultipartitionmulticonsumer2,group_generation_id=6,
> member_id=C2-b7b4f485-d0c3-466b-b0e7-b3de834832e9} to node 2147483647.
> (org.apache.kafka.clients.NetworkClient)
>
> i have one node  and 3 consumers and 15 partitions.
>
> my doubt is :why im getting this num node 2147483647 since i have only
> one node
>


reg kafka API versions

2017-10-20 Thread pravin kumar
I have run a Multiple Partition and Multiple Consumer application.Then i
have altered the no of Partitions,

in log im getting

 Using older server API v0 to send HEARTBEAT {group_id=
altermultipartitionmulticonsumer2,group_generation_id=6,
member_id=C2-b7b4f485-d0c3-466b-b0e7-b3de834832e9} to node 2147483647.
(org.apache.kafka.clients.NetworkClient)

what does olderserver API v0 means .will anyone explain this