Re: Problem consuming message using custom serializer

2016-09-09 Thread Shamik Bandopadhyay
Anyone ?

On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay 
wrote:

> Hi,
>
>   I'm trying to send java object using kryo object serializer . The
> producer is able to send the payload to the queue, but I'm having issues
> reading the data in consumer. I'm using consumer group using KafkaStream.
> The consumer code is based out of the example in kafka documentation.
> Here's the consumer code and the corresponding error:
>
> public void run(int a_numThreads) {
> Map topicCountMap = new HashMap();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List> streams = consumerMap.get(topic);
> executor = Executors.newFixedThreadPool(a_numThreads);
> int threadNumber = 0;
> for (final KafkaStream stream : streams) {
> executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
> threadNumber++;
> }
> }
>
> Inside ConsumerGroupSerializerObject's run method,
>
> private KafkaStream m_stream;
>
> public void run() {
> ConsumerIterator it = m_stream.iterator();
> ByteArrayInputStream in = null;
> ObjectInputStream is = null;
> while (it.hasNext()){
> try{
> in = new ByteArrayInputStream(it.next().message());
> is = new ObjectInputStream(in);
> TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
> }catch(ClassNotFoundException ex){
> ex.printStackTrace();
> }catch(IOException ex){
> ex.printStackTrace();
> }finally{
> try{
> in.close();
> is.close();
> }catch(IOException ex){
> ex.printStackTrace();
> }
> }
> }
> }
>
> I'm getting exception at the following line:
>
> is = new ObjectInputStream(in);
>
> java.io.StreamCorruptedException: invalid stream header: 01746573
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
> at java.io.ObjectInputStream.(ObjectInputStream.java:299)
> at com.test.kafka.consumer.ConsumerGroupSerializerObject.run(
> ConsumerGroupSerializerObject.java:43)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Here's the property:
>
> Properties props = new Properties();
> props.put("zookeeper.connect", a_zookeeper);
> props.put("group.id", a_groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "smallest");
> props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put("value.deserializer", KryoReadingSerializer.class.getName());
>
> I'm new to kafka, so not entirely sure if this is right approach of
> consuming message using custom serializer. Moreover, I'm using KafkaStream
> , can it be an issue as well ?
>
> Any pointers will be appreciated.
>
> Thanks,
> Shamik
>


Time of derived records in Kafka Streams

2016-09-09 Thread Elias Levy
The Kafka Streams documentation discussed how to assign timestamps to
records received from source topic via TimestampExtractor.  But neither the
Kafka nor the Confluent documentation on Kafka Streams explain what
timestamp is associated with a record that has been transformed.

What timestamp is associated with records that are output by stateless
transformations like map or flatMap?

What timestamp is associated with records that are outputted by stageful
transformations like aggregations or joins?

What about transformations on windows?

What timestamp does the Kafka publisher use, if any, when writing to an
intermediate topic via through() or a sink via to()?


Custom Offset Management

2016-09-09 Thread Daniel Fagnan
I’m currently wondering if it’s possible to use the internal 
`__consumer_offsets` topic to manage offsets outside the consumer group APIs. 
I’m using the low-level API to manage the consumers but I’d still like to store 
offsets in Kafka.

If it’s not possible to publish and fetch offsets from the internal topic, 
would a separate compacted log replicate most of the functionality?

Thanks,
Daniel


signature.asc
Description: Message signed with OpenPGP using GPGMail


Flickering Kafka Topic

2016-09-09 Thread Lawrence Weikum
Hello everyone!

We seem to be experiencing some odd behavior in Kafka and were wondering if 
anyone has come across the same issue and if you’ve been able to fix it.  
Here’s the setup:

8 brokers in the cluster.  Kafka 0.10.0.0.

One topic, and only one topic on this cluster, is having issues where ISRs 
continuously decrease and increase but never stabilize.  This happens after 
roughly 50,000 messages per second come in, and the problem is exacerbated when 
the messages increased to 110,000 messages per second.  Messages are small. 
Total inbound is only about 50 MB/s.

There’s no errors in the logs. We just get countless number of messages like 
theses in the logs:

[2016-09-09 12:54:07,147] INFO Partition [topic_a,11] on broker 4: Expanding 
ISR for partition [topic_a,11] from 4 to 4,2 (kafka.cluster.Partition)
[2016-09-09 12:54:23,070] INFO Partition [topic_a,11] on broker 4: Shrinking 
ISR for partition [topic_a,11] from 4,2 to 4 (kafka.cluster.Partition)

This topic has transient data that is unimportant after 20 minutes, so losing 
some due to a cluster shutdown isn’t that important, and we also don’t mind if 
messages are occasionally dropped.  With this in mind we have these settings:
Partitions = 16
Producer ACKs = 1
Replication factor = 2
min.insync.replicas = 1

CPU is sitting fairly idle at ~18%, and a thread dump and profile showed that 
most threads are sitting idle as well – very little contention if any.

We tried to increase the number of partitions from 16 to 24, but it seems to 
have only grown the CPU (from 18% to 23%) and the number of Under Replicated 
Partitions.

Any advice or insight is appreciated. Thank you all!

Lawrence



Re: enhancing KStream DSL

2016-09-09 Thread Ara Ebrahimi
Ah works! Thanks! I was under the impression that these are sequentially 
chained using the DSL. Didn’t realize I can still use allRecords parallel to 
the branches.

Ara.

> On Sep 9, 2016, at 5:27 AM, Michael Noll  wrote:
>
> Oh, my bad.
>
> Updating the third predicate in `branch()` may not even be needed.
>
> You could simply do:
>
> KStream[] branches = allRecords
>.branch(
>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType())
>// Any callRecords that aren't matching any of the two
> predicates above will be dropped.
>);
>
> This would give you two branched streams instead of three:
>
>KStream voiceRecords = branches[0];
>KStream dataRecords = branches[1];
>// No third branched stream like before.
>
> Then, to count "everything" (VOICE + DATA + everything else), simply reuse
> the original `allRecords` stream.
>
>
>
> On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll  wrote:
>
>> Ara,
>>
>> you have shared this code snippet:
>>
>>>   allRecords.branch(
>>>   (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>>   (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>>   (imsi, callRecord) -> true
>>>   );
>>
>> The branch() operation partitions the allRecords KStream into three
>> disjoint streams.
>>
>> I'd suggest the following.
>>
>> First, update the third predicate in your `branch()` step to be "everything
>> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
>> records are removed:
>>
>>
>>KStream[] branches = allRecords
>>.branch(
>>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>(imsi, callRecord) -> !(callRecord.getCallCommType().
>> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
>> qualsIgnoreCase("DATA"))
>>);
>>
>> This would give you:
>>
>>KStream voiceRecords = branches[0];
>>KStream dataRecords = branches[1];
>>KStream recordsThatAreNeitherVoiceNorData =
>> branches[2];
>>
>> Then, to count "everything" (VOICE + DATA + everything else), simply
>> reuse the original `allRecords` stream.
>>
>> -Michael
>>
>>
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi >> wrote:
>>
>>> Let’s say I have this:
>>>
>>>
>>> KStream[] branches = allRecords
>>>.branch(
>>>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>>> ecord.getCallCommType()),
>>>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>>> cord.getCallCommType()),
>>>(imsi, callRecord) -> true
>>>);
>>> KStream callRecords = branches[0];
>>> KStream dataRecords = branches[1];
>>> KStream callRecordCounter = branches[2];
>>>
>>> callRecordCounter
>>>.map((imsi, callRecord) -> new KeyValue<>("", ""))
>>>.countByKey(
>>>UnlimitedWindows.of("counter-window"),
>>>stringSerde
>>>)
>>>.print();
>>>
>>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>>> all the type so that then I can count stuff for a time window. BUT the
>>> problem is branch is implemented like this:
>>>
>>> private class KStreamBranchProcessor extends AbstractProcessor {
>>>@Override
>>>public void process(K key, V value) {
>>>for (int i = 0; i < predicates.length; i++) {
>>>if (predicates[i].test(key, value)) {
>>>// use forward with childIndex here and then break the
>>> loop
>>>// so that no record is going to be piped to multiple
>>> streams
>>>context().forward(key, value, i);
>>>break;
>>>}
>>>}
>>>}
>>> }
>>>
>>> Note the break. So the counter branch is never reached. I’d like to
>>> change the behavior of branch so that all predicates are checked and no
>>> break happens, in say a branchAll() method. What’s the easiest way to this
>>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>>
>>> Ara.
>>>
>>>
>>>
>>>
>>> 
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Thank you 

kafkaproducer send blocks until broker is available

2016-09-09 Thread Peter Sinoros Szabo
Hi,

I'd like to use the Java Kafka producer in a non-blocking async mode.
My assuptions were that until the new message can fit into the producer's 
memory, it will queue up those messages and send out once the broker is 
available.
I tested a simple case when I am sending messages using 
KafkaProducer.send(), but the kafka broker is not available yet (a.k.a the 
broker starts later then the application).
I see that in this case the send() blocks, although the documentation says 
that this method is async.
Is it possible to configure kafka in a way so that the the producer 
bufferes the messages sent out until the broker gets available?

Regards,
Peter







Re: Performance issue with KafkaStreams

2016-09-09 Thread Caleb Welton
Same in both cases:
client.id=Test-Prototype
application.id=test-prototype

group.id=test-consumer-group
bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
replication.factor=2

auto.offset.reset=earliest


On Friday, September 9, 2016 8:48 AM, Eno Thereska  
wrote:



Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno


On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor {
> static long start = -1;
> static long count = 0;
>
> @Override
> public void process(String key, String value) {
> if (start < 0) {
> start = System.currentTimeMillis();
> }
> count += 1;
> if (count > 100) {
> long end = System.currentTimeMillis();
> double time = (end-start)/1000.0;
> System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
> start = -1;
> count = 0;
> }
>
> }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
> .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
> .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>


Re: How to decommission a broker so the controller doesn't return it in the list of known brokers?

2016-09-09 Thread Jeff Widman
It looks like this problem is caused by this bug in Kafka 8, which was
fixed in Kafka 9:

https://issues.apache.org/jira/browse/KAFKA-972


On Thu, Sep 8, 2016 at 3:55 PM, Jeff Widman  wrote:

> How do I permanently remove a broker from a Kafka cluster?
>
> Scenario:
>
> I have a stable cluster of 3 brokers. I temporarily added a fourth broker
> that successfully joined the cluster. The controller returned metadata
> indicating this broker was part of the cluster.
>
> However, I never rebalanced partitions onto this broker, so this broker #4
> was never actually used, didn't join any replica lists, ISRs, etc.
>
> I later decided to remove this unused broker from the cluster. I shutdown
> the broker successfully and Zookeeper /broker/ids no longer lists broker #4.
>
> However, when my application code connects to any Kafka broker and fetches
> metadata, I get a broker list that includes this deleted broker.
>
> How do I indicate to the cluster that this broker has been permanently
> removed from the cluster and not just a transient downtime?
>
> Additionally, what's happening under the covers that causes this?
>
> I'm guessing that when I connect to a broker and ask for metadata, the
> broker checks its local cache for the controller ID, contacts the broker
> and asks it for the list of all brokers. Then the controller checks it's
> cached list of brokers and returns the list of all brokers known to have
> belonged to the cluster at any point in time.
>
> I'm guessing this happens because it's not certain if the dead broker is
> permanently removed or just transient downtime. So I'm thinking I just need
> to indicate to the controller that it needs to reset it's list of known
> cluster brokers to the known live brokers in zookeeper. But would not be
> surprised if something in my mental model is incorrect.
>
> This is for Kafka 0.8.2. I am planning to upgrade to 0.10 in the
> not-to-distant future, so if 0.10 handles this differently, I'm also
> curious about that.
>
>
> --
>
> *Jeff Widman*
> jeffwidman.com  | 740-WIDMAN-J (943-6265)
> <><
>



-- 

*Jeff Widman*
jeffwidman.com  | 740-WIDMAN-J (943-6265)
<><


Sink Connector feature request: SinkTask.putAndReport()

2016-09-09 Thread Dean Arnold
I have a need for volume based commits in a few sink connectors, and the
current interval-only based commit strategy creates some headaches. After
skimming the code, it appears that an alternate put() method that returned
a Map might be used to allow a sink connector to keep
Kafka up to date wrt committed offsets in the sink system, so that Kafka
might defer or reset its commit interval for topics/partitions (at least,
for the consumer used for SinkTasks). It wouldn't replace interval based
flush(), but hopefully flush() would be invoked much less frequently, and
permit the flush interval to be increased, so the sink connector can better
optimize its commit batches. Eg, the sink could almost always commit 5000
records, rather than whatever happened to be buffered up when flush() was
called, which might be very small or very large.

I'm thinking of something like

Map putAndReport(Collection record)

Put the records in the sink, and, if the operation results in committing
records to the sink system, report the largest committed offset of each
committed topic/partition. Returns null if no commit occurs.

I'm not certain how a rebalance might effect the processing; since a sink
would still need to support existing interval based commits, the rebalance
(and any sink recovery) would presumably work the same.

Am I overlooking any systemic issues that would preclude such a feature ?


Asynchronous non-blocking Kafka producer without loosing messages

2016-09-09 Thread Michal Turek

Hi there,

We are preparing update of our Kafka cluster and applications to Kafka 
0.10.x and we have some difficulties with configuration of *Kafka 
producer to be asynchronous and reliably non-blocking*.


As I understand KIP-19 (1), the main intention of Kafka developers was 
to hard-limit how long time the producer may block (max.block.ms) 
instead of making the producer fully non-blocking as one expects from 
the asynchronous API with futures and callbacks.


The issue with max.block.ms is that value 0 will cause immediate 
TimeoutException if topics metadata are not available instead of 
asynchronous waiting for them followed by completion of the earlier 
returned future. TimeoutException is typical for the first requests 
after start, but probably (not checked) also for add/remove 
topic/partition, broker restarts, etc. Any other value than 0 isn't 
usable for the non-blocking requirement. What to choose? 1 minute 
(default), 1 second, 100 ms, 10 ms - only 0 would be right.


I found multiple Jira tickets that describes this issue (2), some of 
them are closed but none resolved.


My questions are:
- Did I miss something obvious?
- Did anyone of you workaround this issue and how?

I'm considering to implement a workaround to our applications probably 
based on ThreadPoolExecutor.setRejectedExecutionHandler() (3) or 
BlockingQueue.offer() (4) but I wanted to ask here before to find and 
reuse an existing solution.


Thanks,
Michal

(1) 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
(2) https://issues.apache.org/jira/browse/KAFKA-1835, 
https://issues.apache.org/jira/browse/KAFKA-2137, 
https://issues.apache.org/jira/browse/KAFKA-3236, 
https://issues.apache.org/jira/browse/KAFKA-3450
(3) 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html#setRejectedExecutionHandler-java.util.concurrent.RejectedExecutionHandler-
(4) 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#offer-E-


Re: Performance issue with KafkaStreams

2016-09-09 Thread Eno Thereska
Hi Caleb,

Could you share your Kafka Streams configuration (i.e., StreamsConfig
properties you might have set before the test)?

Thanks
Eno

On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:

> I have a question with respect to the KafkaStreams API.
>
> I noticed during my prototyping work that my KafkaStreams application was
> not able to keep up with the input on the stream so I dug into it a bit and
> found that it was spending an inordinate amount of time in
> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
> gun itself, so I dropped the implementation down to a single processor
> reading off a source.
>
> public class TestProcessor extends AbstractProcessor {
> static long start = -1;
> static long count = 0;
>
> @Override
> public void process(String key, String value) {
> if (start < 0) {
> start = System.currentTimeMillis();
> }
> count += 1;
> if (count > 100) {
> long end = System.currentTimeMillis();
> double time = (end-start)/1000.0;
> System.out.printf("Processed %d records in %f seconds (%f
> records/s)\n", count, time, count/time);
> start = -1;
> count = 0;
> }
>
> }
>
> }
>
> ...
>
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder
> .addSource("SOURCE", stringDeserializer, StringDeserializer,
> "input")
> .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>
>
>
> Which I then ran through the KafkaStreams API, and then repeated with
> the KafkaConsumer API.
>
> Using the KafkaConsumer API:
> Processed 101 records in 1.79 seconds (558659.776536 records/s)
> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>
> Using the KafkaStreams API:
> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>
>
> The profile on the KafkaStreams consisted of:
>
> 89.2% org.apache.kafka.common.network.Selector.select()
>  7.6% org.apache.kafka.clients.producer.internals.
> ProduceRequestResult.await()
>  0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>
>
> Is a 5X performance difference between Kafka Consumer and the
> KafkaStreams api expected?
>
> Are there specific things I can do to diagnose/tune the system?
>
> Thanks,
>   Caleb
>


RE: No error to kafka-producer on broker shutdown

2016-09-09 Thread Tauzell, Dave
The send() method returns a Future.  You need to get the result at some point 
to see what happened.  A simple way would be:

m_kafkaProducer.send(prMessage).get();

-Dave

-Original Message-
From: Agostino Calamita [mailto:agostino.calam...@gmail.com]
Sent: Friday, September 9, 2016 9:33 AM
To: users@kafka.apache.org
Subject: No error to kafka-producer on broker shutdown

Hi,
I'm writing a little test to check Kafka high availability, with 2 brokers,
1 topic with replication factor = 2 and min.insync.replicas=2.

This is the test:

 System.out.println("Building KafkaProducer...");

 KafkaProducer m_kafkaProducer = new 
KafkaProducer(propsProducer);

 System.out.println("Building ProducerRecord...");

 ProducerRecord prMessage = new 
ProducerRecord(strTopic, jsonInString.getBytes());

 long now = System.currentTimeMillis();

 

 for (int i=0; i<3; i++)
 {
try {
  for(int x=1; x<= numMessages; x++)
m_kafkaProducer.send(prMessage);

 System.out.println("Wait for 60 seconds");

  Thread.sleep(6);

 } catch(Exception e)
 {

 System.out.println("Error sending message : "
+ e.getMessage());
 }

 }

  . . .

When test is running, after first step of "for cicle", I kill one broker, so 
only one broker remains alive.
When the test execute second and third cicle, no errors are caught by 
kafka-producer; I see only error on kafka broker logs. The test terminate 
successfully, with messages not really sent.

In this way my application that use "async" producer is not able to catch 
invalid state of kafka brokers.

Is there a way to catch this kind of errors on kafka producers ?

Thanks.
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


No error to kafka-producer on broker shutdown

2016-09-09 Thread Agostino Calamita
Hi,
I'm writing a little test to check Kafka high availability, with 2 brokers,
1 topic with replication factor = 2 and min.insync.replicas=2.

This is the test:

 System.out.println("Building KafkaProducer...");

 KafkaProducer m_kafkaProducer = new
KafkaProducer(propsProducer);

 System.out.println("Building ProducerRecord...");

 ProducerRecord prMessage = new
ProducerRecord(strTopic, jsonInString.getBytes());

 long now = System.currentTimeMillis();

 

 for (int i=0; i<3; i++)
 {
try {
  for(int x=1; x<= numMessages; x++)
m_kafkaProducer.send(prMessage);

 System.out.println("Wait for 60 seconds");

  Thread.sleep(6);

 } catch(Exception e)
 {

 System.out.println("Error sending message : "
+ e.getMessage());
 }

 }

  . . .

When test is running, after first step of "for cicle", I kill one broker,
so only one broker remains alive.
When the test execute second and third cicle, no errors are caught by
kafka-producer; I see only error on kafka broker logs. The test terminate
successfully, with messages not really sent.

In this way my application that use "async" producer is not able to catch
invalid state of kafka brokers.

Is there a way to catch this kind of errors on kafka producers ?

Thanks.


Multiple producer instances choose same partition

2016-09-09 Thread devoss ind
Hi,

Am a newbie to Kafka and would like to use it. I have a question regarding
the same partition selection from multiple producers. Assume that I did not
specify any key while sending a message from producer and let producer to
choose partition in a round-robin manner. I have multiple producer nodes
(i.e., running in different machines) and if all nodes choose the same
partition for the messages to be sent, will there be any locking of the
partition log at broker? Will there be any performance problems at the
broker while writing messages to same partition produced from different
producers?

Thanks,
Devoss.


Re: enhancing KStream DSL

2016-09-09 Thread Michael Noll
Oh, my bad.

Updating the third predicate in `branch()` may not even be needed.

You could simply do:

KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
ecord.getCallCommType()),
(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
cord.getCallCommType())
// Any callRecords that aren't matching any of the two
predicates above will be dropped.
);

This would give you two branched streams instead of three:

KStream voiceRecords = branches[0];
KStream dataRecords = branches[1];
// No third branched stream like before.

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.



On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll  wrote:

> Ara,
>
> you have shared this code snippet:
>
> >allRecords.branch(
> >(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> >(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> >(imsi, callRecord) -> true
> >);
>
> The branch() operation partitions the allRecords KStream into three
> disjoint streams.
>
> I'd suggest the following.
>
> First, update the third predicate in your `branch()` step to be "everything
> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
> records are removed:
>
>
> KStream[] branches = allRecords
> .branch(
> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType()),
> (imsi, callRecord) -> !(callRecord.getCallCommType().
> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
> qualsIgnoreCase("DATA"))
> );
>
> This would give you:
>
> KStream voiceRecords = branches[0];
> KStream dataRecords = branches[1];
> KStream recordsThatAreNeitherVoiceNorData =
> branches[2];
>
> Then, to count "everything" (VOICE + DATA + everything else), simply
> reuse the original `allRecords` stream.
>
> -Michael
>
>
>
>
>
> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi  > wrote:
>
>> Let’s say I have this:
>>
>>
>> KStream[] branches = allRecords
>> .branch(
>> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>> (imsi, callRecord) -> true
>> );
>> KStream callRecords = branches[0];
>> KStream dataRecords = branches[1];
>> KStream callRecordCounter = branches[2];
>>
>> callRecordCounter
>> .map((imsi, callRecord) -> new KeyValue<>("", ""))
>> .countByKey(
>> UnlimitedWindows.of("counter-window"),
>> stringSerde
>> )
>> .print();
>>
>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>> all the type so that then I can count stuff for a time window. BUT the
>> problem is branch is implemented like this:
>>
>> private class KStreamBranchProcessor extends AbstractProcessor {
>> @Override
>> public void process(K key, V value) {
>> for (int i = 0; i < predicates.length; i++) {
>> if (predicates[i].test(key, value)) {
>> // use forward with childIndex here and then break the
>> loop
>> // so that no record is going to be piped to multiple
>> streams
>> context().forward(key, value, i);
>> break;
>> }
>> }
>> }
>> }
>>
>> Note the break. So the counter branch is never reached. I’d like to
>> change the behavior of branch so that all predicates are checked and no
>> break happens, in say a branchAll() method. What’s the easiest way to this
>> functionality to the DSL? I tried process() but it doesn’t return KStream.
>>
>> Ara.
>>
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>


Re: enhancing KStream DSL

2016-09-09 Thread Michael Noll
Ara,

you have shared this code snippet:

>allRecords.branch(
>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
>(imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
>(imsi, callRecord) -> true
>);

The branch() operation partitions the allRecords KStream into three
disjoint streams.

I'd suggest the following.

First, update the third predicate in your `branch()` step to be "everything
but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
records are removed:


KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> "VOICE".equalsIgnoreCase(
callRecord.getCallCommType()),
(imsi, callRecord) -> "DATA".equalsIgnoreCase(
callRecord.getCallCommType()),
(imsi, callRecord) -> !(callRecord.getCallCommType().
equalsIgnoreCase("VOICE") || callRecord.getCallCommType().
equalsIgnoreCase("DATA"))
);

This would give you:

KStream voiceRecords = branches[0];
KStream dataRecords = branches[1];
KStream recordsThatAreNeitherVoiceNorData =
branches[2];

Then, to count "everything" (VOICE + DATA + everything else), simply reuse
the original `allRecords` stream.

-Michael





On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi 
wrote:

> Let’s say I have this:
>
>
> KStream[] branches = allRecords
> .branch(
> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(
> callRecord.getCallCommType()),
> (imsi, callRecord) -> "DATA".equalsIgnoreCase(
> callRecord.getCallCommType()),
> (imsi, callRecord) -> true
> );
> KStream callRecords = branches[0];
> KStream dataRecords = branches[1];
> KStream callRecordCounter = branches[2];
>
> callRecordCounter
> .map((imsi, callRecord) -> new KeyValue<>("", ""))
> .countByKey(
> UnlimitedWindows.of("counter-window"),
> stringSerde
> )
> .print();
>
> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if
> data is DATA. Branch 2 is supposed to get triggered regardless of type all
> the type so that then I can count stuff for a time window. BUT the problem
> is branch is implemented like this:
>
> private class KStreamBranchProcessor extends AbstractProcessor {
> @Override
> public void process(K key, V value) {
> for (int i = 0; i < predicates.length; i++) {
> if (predicates[i].test(key, value)) {
> // use forward with childIndex here and then break the loop
> // so that no record is going to be piped to multiple
> streams
> context().forward(key, value, i);
> break;
> }
> }
> }
> }
>
> Note the break. So the counter branch is never reached. I’d like to change
> the behavior of branch so that all predicates are checked and no break
> happens, in say a branchAll() method. What’s the easiest way to this
> functionality to the DSL? I tried process() but it doesn’t return KStream.
>
> Ara.
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>


Re: Kafka broker crash - broker id then changed

2016-09-09 Thread cs user
Coming back to this issue, looks like it was a result of the centos 7
systemd cleanup task on tmp:

/usr/lib/tmpfiles.d/tmp.conf



#  This file is part of systemd.

#

#  systemd is free software; you can redistribute it and/or modify it

#  under the terms of the GNU Lesser General Public License as published by

#  the Free Software Foundation; either version 2.1 of the License, or

#  (at your option) any later version.



# See tmpfiles.d(5) for details



# Clear tmp directories separately, to make them easier to override

v /tmp 1777 root root 10d

v /var/tmp 1777 root root 30d



# Exclude namespace mountpoints created with PrivateTmp=yes

x /tmp/systemd-private-%b-*

X /tmp/systemd-private-%b-*/tmp

x /var/tmp/systemd-private-%b-*

X /var/tmp/systemd-private-%b-*/tmp




Cheers!



On Thu, May 26, 2016 at 9:27 AM, cs user  wrote:

> Hi Ben,
>
> Thanks for responding. I can't imagine what would have cleaned temp up at
> that time. I don't think we have anything in place to do that, it also
> appears to happened to both machines at the same time.
>
> It also appears that the other topics were not affected, there were still
> other files present in temp.
>
> Thanks!
>
> On Thu, May 26, 2016 at 9:19 AM, Ben Davison 
> wrote:
>
>> Possibly tmp got cleaned up?
>>
>> Seems like one of the log files where deleted while a producer was writing
>> messages to it:
>>
>> On Thursday, 26 May 2016, cs user  wrote:
>>
>> > Hi All,
>> >
>> > We are running Kafka version 0.9.0.1, at the time the brokers crashed
>> > yesterday we were running in a 2 mode cluster. This has now been
>> increased
>> > to 3.
>> >
>> > We are not specifying a broker id and relying on kafka generating one.
>> >
>> > After the brokers crashed (at exactly the same time) we left kafka
>> stopped
>> > for a while. After kafka was started back up, the broker id's on both
>> > servers were incremented, they were 1001/1002 and they flipped to
>> > 1003/1004. This seemed to cause some problems as partitions were
>> assigned
>> > to broker id's which it believed had disappeared and so were not
>> > recoverable.
>> >
>> > We noticed that the broker id's are actually stored in:
>> >
>> > /tmp/kafka-logs/meta.properties
>> >
>> > So we set these back to what they were and restarted. Is there a reason
>> why
>> > these would change?
>> >
>> > Below are the error logs from each server:
>> >
>> > Server 1
>> >
>> > [2016-05-25 09:05:52,827] INFO [ReplicaFetcherManager on broker 1002]
>> > Removed fetcher for partitions [Topic1Heartbeat,1]
>> > (kafka.server.ReplicaFetcherManager)
>> > [2016-05-25 09:05:52,831] INFO Completed load of log Topic1Heartbeat-1
>> with
>> > log end offset 0 (kafka.log.Log)
>> > [2016-05-25 09:05:52,831] INFO Created log for partition
>> > [Topic1Heartbeat,1] in /tmp/kafka-logs with properties
>> {compression.type ->
>> > producer, file.delete.delay.ms -> 6, max.message.bytes -> 112,
>> > min.insync.replicas -> 1, segment.
>> > jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5,
>> > index.interval.bytes -> 4096, unclean.leader.election.enable -> true,
>> > retention.bytes -> -1, delete.retention.ms -> 8640, cleanup.policy
>> ->
>> > delete, flush.ms -> 9
>> > 223372036854775807, segment.ms -> 60480, segment.bytes ->
>> 1073741824,
>> > retention.ms -> 60480, segment.index.bytes -> 10485760,
>> flush.messages
>> > -> 9223372036854775807}. (kafka.log.LogManager)
>> > [2016-05-25 09:05:52,831] INFO Partition [Topic1Heartbeat,1] on broker
>> > 1002: No checkpointed highwatermark is found for partition
>> > [Topic1Heartbeat,1] (kafka.cluster.Partition)
>> > [2016-05-25 09:14:12,189] INFO [GroupCoordinator 1002]: Preparing to
>> > restabilize group Topic1 with old generation 0
>> > (kafka.coordinator.GroupCoordinator)
>> > [2016-05-25 09:14:12,190] INFO [GroupCoordinator 1002]: Stabilized group
>> > Topic1 generation 1 (kafka.coordinator.GroupCoordinator)
>> > [2016-05-25 09:14:12,195] INFO [GroupCoordinator 1002]: Assignment
>> received
>> > from leader for group Topic1 for generation 1
>> > (kafka.coordinator.GroupCoordinator)
>> > [2016-05-25 09:14:12,749] FATAL [Replica Manager on Broker 1002]:
>> Halting
>> > due to unrecoverable I/O error while handling produce request:
>> >  (kafka.server.ReplicaManager)
>> > kafka.common.KafkaStorageException: I/O exception in append to log
>> > '__consumer_offsets-0'
>> > at kafka.log.Log.append(Log.scala:318)
>> > at kafka.cluster.Partition$$anonfun$9.apply(Partition.
>> scala:442)
>> > at kafka.cluster.Partition$$anonfun$9.apply(Partition.
>> scala:428)
>> > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>> > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
>> > at
>> > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
>> > at
>> >
>> >