Re: Error kafka-stream method punctuate in context.forward()

2016-09-22 Thread Guozhang Wang
ull) {
> statBs = new Stat();
> statBs.setCount(stat.getCount());
> statBs.setOperatorId(stat.getOperatorId());
> statBs.setNetworkPartnerId(
> stat.getNetworkPartnerId());
> statBs.setBaseStationId(stat.getBaseStationId());
> this.kvStore.put(bsKey, serializeStat(statBs));
> } else {
> statBs.setCount(statBs.getCount() + stat.getCount());
> this.kvStore.put(bsKey, serializeStat(statBs));
> }
>
>
> }
> }
> }
> }
>
> -
>
> public class ProcessorStatsByHourSupplier extends
> ProcessorStatsByTimeSupplier {
>
> public ProcessorStatsByHourSupplier(ProcessorStatsByHour processor) {
> super(processor);
> }
>
> @Override
> public Processor<String, String> get() {
> return processor;
> }
> }
>
> -
>
> public abstract class BaseProcessor implements Processor<String, String> {
>
> private static ObjectMapper objectMapper = new ObjectMapper();
>
> private TimeUnit timeUnit;
> private int countTimeUnit;
> private String countStoreName;
>
> protected ProcessorContext context;
> protected KeyValueStore<String, String> kvStore;
>
> public BaseProcessor(TimeUnit timeUnit, int countTimeUnit, String
> countStoreName) {
> this.timeUnit = timeUnit;
> this.countTimeUnit = countTimeUnit;
> this.countStoreName = countStoreName;
> }
>
> @Override
> @SuppressWarnings("unchecked")
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(TimeUnit.MILLISECONDS.convert(countTimeUnit,
> timeUnit));
> this.kvStore = (KeyValueStore<String, String>)
> context.getStateStore(countStoreName);
> }
>
> @Override
> public void punctuate(long timestamp) {
> try (KeyValueIterator<String, String> iter = this.kvStore.all()) {
> System.out.println("--- " + timestamp + " ---
> ");
> while (iter.hasNext()) {
> System.out.println("--- pass1 --- ");
> KeyValue<String, String> entry = iter.next();
> Stat stat = deserializeStat(entry.value);
> if (stat != null) {
> System.out.println("--- pass2 --- ");
> stat.setTimestamp(timestamp);
> }
> System.out.println("--- pass3 --- ");
> System.out.println("key"+entry.key);
> System.out.println("stat"+serializeStat(stat));
> System.out.println("context"+context);
> context.forward(entry.key, serializeStat(stat));
> System.out.println("[" + entry.key + ", " +
> serializeStat(stat) + "]");
> iter.remove();
> }
> } finally {
> context.commit();
> }
> }
>
> @Override
> public void close() {
> this.kvStore.close();
> }
>
> protected static Uplink deserialize(String json) {
>     try {
>         return objectMapper.readValue(json, Uplink.class);
> } catch (IOException e) {
> System.out.println(e.getMessage());
> return new Uplink().setOperatorId("fake")
> .setNetworkPartnerId("fake").setBaseStationId("fake").
> setTimeStampProduce(60L);
>
> }
> }
>
> protected static Stat deserializeStat(String json) {
> if (json == null) {
> return null;
> }
>
> try {
> return objectMapper.readValue(json, Stat.class);
> } catch (IOException e) {
> System.out.println(e.getMessage());
> return new Stat().setOperatorId("fake").
> setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.
> currentTimeMillis()).setCount(-1L);
> }
> }
>
> protected static String serializeStat(Stat stat) {
> try {
> String s = objectMapper.writeValueAsString(stat);
>
> return s;
> } catch (IOException e) {
> System.out.println(e.getMessage());
> return "{'operatorId':'fake

RE: Error kafka-stream method punctuate in context.forward()

2016-09-20 Thread Hamza HACHANI
I'm using the version 10.0


De : Hamza HACHANI
Envoyé : lundi 19 septembre 2016 19:20:23
À : users@kafka.apache.org
Objet : RE: Error kafka-stream method punctuate in context.forward()


Hi Guozhang,

 Here is the code for the two concerned classes

If this can help i fugure out that the instances  of

ProcessorStatsByHourSupplier and  ProcessorStatsByMinuteSupplier  which are 
returned are the same.

I think this is the problem. I tried to fix it but i was not to do it.


Thanks Guozhang

Hamza


--

public class StatsByMinute {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByMinute");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
// Where to find the corresponding ZooKeeper ensemble.
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "uplink");

String countStoreName= "CountsStore" + System.currentTimeMillis();

builder.addProcessor("Process", new ProcessorStatsByMinuteSupplier(new 
ProcessorStatsByMinute(1, countStoreName)), "Source");

builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
builder.addSink("Sink", "statsM", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}


-

public class StatsByHour {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processorByHour");
// Where to find Kafka broker(s).
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"192.168.1.82:9094,192.168.1.89:9093,192.168.1.82:9092");
// Where to find the corresponding ZooKeeper ensemble.
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "192.168.1.82:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("Source", "statsM");

String countStoreName= "CountsStore" + System.currentTimeMillis();

ProcessorStatsByHourSupplier processorStatsByHourSupplier = new 
ProcessorStatsByHourSupplier(new ProcessorStatsByHour(3, countStoreName));
System.out.println(processorStatsByHourSupplier);
builder.addProcessor("Process", processorStatsByHourSupplier, "Source");

builder.addStateStore(Stores.create(countStoreName).withStringKeys().withStringValues().inMemory().build(),
 "Process");
builder.addSink("Sink", "statsH", Serdes.String().serializer(), 
Serdes.String().serializer(), "Process");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}


---

public class ProcessorStatsByHour extends BaseProcessor {

public ProcessorStatsByHour(int countTimeUnit, String countStoreName) {
super(TimeUnit.MINUTES, countTimeUnit, countStoreName);
}

@Override
public void process(String key, String json) {
Stat stat = deserializeStat(json);
if(stat != null) {
if ((stat.getNetworkPartnerId() == null) && 
(stat.getBaseStationId() == null)) {
String opKey = stat.getOperatorId();
Stat statOp = deserializeStat(this.kvStore.get(opKey));
if (statOp == null) {
statOp = new Stat();
statOp.setCount(stat.getCount());
statOp.setOperatorId(stat.getOperatorId());
this.kvStore.put(opKey, serializeStat(statOp));
} else {
statOp.setCount(statOp.getCount() + stat.getCount());
this.kvStore.put(opKey, serializeStat(statOp));
}
} else if (stat.getBaseStationId() == null) {
String npKey = stat.getOperatorId() + "_" + 
stat.getNetworkPartnerId();
Stat statNp = deserializeStat(this.kvStore.get(npKey));
  

RE: Error kafka-stream method punctuate in context.forward()

2016-09-20 Thread Hamza HACHANI
it.MILLISECONDS.convert(countTimeUnit, 
timeUnit));
this.kvStore = (KeyValueStore<String, String>) 
context.getStateStore(countStoreName);
}

@Override
public void punctuate(long timestamp) {
try (KeyValueIterator<String, String> iter = this.kvStore.all()) {
System.out.println("--- " + timestamp + " --- ");
while (iter.hasNext()) {
System.out.println("--- pass1 --- ");
KeyValue<String, String> entry = iter.next();
Stat stat = deserializeStat(entry.value);
if (stat != null) {
System.out.println("--- pass2 --- ");
stat.setTimestamp(timestamp);
}
System.out.println("--- pass3 --- ");
System.out.println("key"+entry.key);
System.out.println("stat"+serializeStat(stat));
System.out.println("context"+context);
context.forward(entry.key, serializeStat(stat));
System.out.println("[" + entry.key + ", " + serializeStat(stat) 
+ "]");
iter.remove();
}
} finally {
context.commit();
}
}

@Override
public void close() {
this.kvStore.close();
}

protected static Uplink deserialize(String json) {
try {
return objectMapper.readValue(json, Uplink.class);
} catch (IOException e) {
System.out.println(e.getMessage());
return new 
Uplink().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimeStampProduce(60L);

}
}

protected static Stat deserializeStat(String json) {
if (json == null) {
return null;
}

try {
return objectMapper.readValue(json, Stat.class);
} catch (IOException e) {
System.out.println(e.getMessage());
return new 
Stat().setOperatorId("fake").setNetworkPartnerId("fake").setBaseStationId("fake").setTimestamp(System.currentTimeMillis()).setCount(-1L);
}
    }

    protected static String serializeStat(Stat stat) {
try {
String s = objectMapper.writeValueAsString(stat);

return s;
} catch (IOException e) {
System.out.println(e.getMessage());
return 
"{'operatorId':'fake','networkPartnerId':'fake','baseStationId':'fake','count':-1,'timestamp':5}";
}
}
}



De : Guozhang Wang <wangg...@gmail.com>
Envoyé : lundi 19 septembre 2016 12:19:36
À : users@kafka.apache.org
Objet : Re: Error kafka-stream method punctuate in context.forward()

Hello Hamza,

Which Kafka version are you using with this application? Also could you
share your code skeleton of the StatsByDay processor implementation?


Guozhang


On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI <hamza.hach...@supcom.tn>
wrote:

> Good morning,
>
> I have a problem with a kafka-stream application.
>
> In fact I 've created already two kafka stream applications :
>
> StatsByMinute : entry topic : uplinks, out topic : statsM.
>
> StatsByHour : entrey topic : statsM, out topic : statsH.
>
> StatsByDay : entry topic : statsH, out topic : statsD.
>
>
>
> The three of these application hava naerly the same struture (
> StatsByMinute and StatsBy Hour/Stats By Day are only different in the
> application ID KVstore and the mthos process() ).
>
> StatsBy Day and Stats BY Hour have exactly the same structure (the only
> exception is the ID parameters) .
>
>
> The Problem is that stastByMinute and StatsByHour works parfectly.
>
> But this this not the case for StatsByDay where i verified that i do
> receive data and process it (so process works). but in the line
> context.forward in punctuate  there is a problem.
>
> I get the following error :
>
>
> [2016-09-16 15:44:24,467] ERROR Streams application error during
> processing in thread [StreamThread-1]:  (org.apache.kafka.streams.
> processor.internals.StreamThread)
> java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
> at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:4

Re: Error kafka-stream method punctuate in context.forward()

2016-09-19 Thread Guozhang Wang
Hello Hamza,

Which Kafka version are you using with this application? Also could you
share your code skeleton of the StatsByDay processor implementation?


Guozhang


On Fri, Sep 16, 2016 at 6:58 AM, Hamza HACHANI 
wrote:

> Good morning,
>
> I have a problem with a kafka-stream application.
>
> In fact I 've created already two kafka stream applications :
>
> StatsByMinute : entry topic : uplinks, out topic : statsM.
>
> StatsByHour : entrey topic : statsM, out topic : statsH.
>
> StatsByDay : entry topic : statsH, out topic : statsD.
>
>
>
> The three of these application hava naerly the same struture (
> StatsByMinute and StatsBy Hour/Stats By Day are only different in the
> application ID KVstore and the mthos process() ).
>
> StatsBy Day and Stats BY Hour have exactly the same structure (the only
> exception is the ID parameters) .
>
>
> The Problem is that stastByMinute and StatsByHour works parfectly.
>
> But this this not the case for StatsByDay where i verified that i do
> receive data and process it (so process works). but in the line
> context.forward in punctuate  there is a problem.
>
> I get the following error :
>
>
> [2016-09-16 15:44:24,467] ERROR Streams application error during
> processing in thread [StreamThread-1]:  (org.apache.kafka.streams.
> processor.internals.StreamThread)
> java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
> at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(StreamTask.java:212)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:407)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:325)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
> Exception in thread "StreamThread-1" java.lang.NullPointerException
> at org.apache.kafka.streams.processor.internals.
> StreamTask.forward(StreamTask.java:336)
> at org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
> at com.actility.tpk.stat.BaseProcessor.punctuate(
> BaseProcessor.java:54)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.punctuate(StreamTask.java:227)
> at org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> at org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(StreamTask.java:212)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:407)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(StreamThread.java:325)
> at org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:218)
>
>
>


-- 
-- Guozhang