Re: Debugging message timestamps in Sarama

2018-07-25 Thread Craig Ching
This didn’t fix my problem unfortunately.  Both time stamps are 0.


> On Jul 24, 2018, at 15:22, Craig Ching  wrote:
> 
> Hey, thanks for that Dmitriy!  I'll have a look.
> 
>> On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov 
>>  wrote:
>> Not really associated with Sarama.
>> 
>> But your issues sounds pretty much same i faced some time ago and fixed,
>> here it is: https://github.com/Shopify/sarama/issues/885
>> 
>> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.
>> 
>> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching  wrote:
>> 
>> > Hi Dmitry,
>> >
>> > Are you associated with the Sarama project?  If so, understand that part of
>> > what I want is to learn about Sarama and the Kafka message format ;)
>> >
>> > The problem I'm having is that if I turn on:
>> >
>> > log.message.timestamp.type=LogAppendTime
>> >
>> > in the broker, then produce on topic1 with console producer, I will see
>> > timestamps in the sarama client.  If I produce on topic2 with telegraf
>> > (incidentally, I think telegraf is a sarama producer), then I don't see
>> > timestamps in the sarama client.  In both cases, if I consume using the
>> > console consumer (with --property print.timestamp=true) I *do* see
>> > timestamps.
>> >
>> > I'm happy to debug this issue myself and submit a PR to sarama, but I am
>> > missing some fundamentals of how to decode the kafka message format and
>> > would really like some pointers.
>> >
>> > Cheers,
>> > Craig
>> >
>> > P.S.  Here is the sarama code I'm using to test:
>> >
>> > package main
>> >
>> > import (
>> > "fmt"
>> > "log"
>> > "os"
>> > "os/signal"
>> > "time"
>> >
>> > "github.com/Shopify/sarama"
>> > )
>> >
>> > func main() {
>> >
>> > // Initialize Sarama logging
>> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
>> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
>> >
>> > signals := make(chan os.Signal, 1)
>> > signal.Notify(signals, os.Interrupt)
>> >
>> > config := sarama.NewConfig()
>> > config.Consumer.Return.Errors = true
>> > config.ClientID = "consumer-test"
>> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
>> > config.Metadata.Full = true
>> > // config.Version = sarama.V0_10_0_0
>> > config.Version = sarama.V1_1_0_0
>> > // config.Version = sarama.V0_10_2_1
>> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
>> >
>> > brokers := []string{"localhost:9092"}
>> > // brokers :=
>> >
>> > []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
>> >
>> > client, err := sarama.NewConsumer(brokers, config)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > // topic := "topic1"
>> > topic := "topic2"
>> > // topic := "metric-influx-measurement"
>> > // How to decide partition, is it fixed value...?
>> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
>> > if err != nil {
>> > panic(err)
>> > }
>> >
>> > defer func() {
>> > if err := client.Close(); err != nil {
>> > panic(err)
>> > }
>> > }()
>> >
>> > // Count how many message processed
>> > msgCount := 0
>> >
>> > go func() {
>> > for {
>> > select {
>> > case err := <-consumer.Errors():
>> > fmt.Println(err)
>> > case msg := <-consumer.Messages():
>> > msgCount++
>> > fmt.Println(msg.Timestamp)
>> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
>> > case <-signals:
>> > fmt.Println("Interrupt is detected")
>> > break
>> > }
>> > }
>> > }()
>> > <-signals
>> > }
>> >
>> >
>> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
>> > dvsekhval...@gmail.com>
>> > wrote:
>> >
>> > > Hey Craig,
>> > >
>> > > what exact problem you have with Sarama client?
>> > >
>> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching 
>> > wrote:
>> > >
>> > > > Hi!
>> > > >
>> > > > I'm working on debugging a problem with how message timestamps are
>> > > handled
>> > > > in the sarama client.  In some cases, the sarama client won't
>> > associate a
>> > > > timestamp with a message while the kafka console consumer does.  I've
>> > > found
>> > > > the documentation on the message format here:
>> > > >
>> > > > https://kafka.apache.org/documentation/#messageformat
>> > > >
>> > > > But the information there is very sparse.  For instance, what are
>> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
>> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
>> > > > timestamp I want.  Is there some state about the message that I need to
>> > > > understand in order to have maxTimestamp be used?  Any further
>> > > > documentation or guidance on this would be very helpful!
>> > > >
>> > > > On another note, I am trying to debug this through the scala/java
>> > console
>> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
>> > > > anything special or documentation I need to set this up for debugging?
>> > > >
>> > >
>> >


Re: Debugging message timestamps in Sarama

2018-07-24 Thread Craig Ching
Hey, thanks for that Dmitriy!  I'll have a look.

On Tue, Jul 24, 2018 at 11:18 AM Dmitriy Vsekhvalnov 
wrote:

> Not really associated with Sarama.
>
> But your issues sounds pretty much same i faced some time ago and fixed,
> here it is: https://github.com/Shopify/sarama/issues/885
>
> Try using msg.BlockTimestamp instead of msg.Timestamp and see if it helps.
>
> On Tue, Jul 24, 2018 at 3:26 AM Craig Ching  wrote:
>
> > Hi Dmitry,
> >
> > Are you associated with the Sarama project?  If so, understand that part
> of
> > what I want is to learn about Sarama and the Kafka message format ;)
> >
> > The problem I'm having is that if I turn on:
> >
> > log.message.timestamp.type=LogAppendTime
> >
> > in the broker, then produce on topic1 with console producer, I will see
> > timestamps in the sarama client.  If I produce on topic2 with telegraf
> > (incidentally, I think telegraf is a sarama producer), then I don't see
> > timestamps in the sarama client.  In both cases, if I consume using the
> > console consumer (with --property print.timestamp=true) I *do* see
> > timestamps.
> >
> > I'm happy to debug this issue myself and submit a PR to sarama, but I am
> > missing some fundamentals of how to decode the kafka message format and
> > would really like some pointers.
> >
> > Cheers,
> > Craig
> >
> > P.S.  Here is the sarama code I'm using to test:
> >
> > package main
> >
> > import (
> > "fmt"
> > "log"
> > "os"
> > "os/signal"
> > "time"
> >
> > "github.com/Shopify/sarama"
> > )
> >
> > func main() {
> >
> > // Initialize Sarama logging
> > sarama.Logger = log.New(os.Stdout, "[Sarama] ",
> > log.Ldate|log.Lmicroseconds|log.Lshortfile)
> >
> > signals := make(chan os.Signal, 1)
> > signal.Notify(signals, os.Interrupt)
> >
> > config := sarama.NewConfig()
> > config.Consumer.Return.Errors = true
> > config.ClientID = "consumer-test"
> > config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
> > config.Metadata.Full = true
> > // config.Version = sarama.V0_10_0_0
> > config.Version = sarama.V1_1_0_0
> > // config.Version = sarama.V0_10_2_1
> > config.Consumer.Offsets.Initial = sarama.OffsetOldest
> >
> > brokers := []string{"localhost:9092"}
> > // brokers :=
> >
> >
> []string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}
> >
> > client, err := sarama.NewConsumer(brokers, config)
> > if err != nil {
> > panic(err)
> > }
> >
> > // topic := "topic1"
> > topic := "topic2"
> > // topic := "metric-influx-measurement"
> > // How to decide partition, is it fixed value...?
> > consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
> > if err != nil {
> > panic(err)
> > }
> >
> > defer func() {
> > if err := client.Close(); err != nil {
> > panic(err)
> > }
> > }()
> >
> > // Count how many message processed
> > msgCount := 0
> >
> > go func() {
> > for {
> > select {
> > case err := <-consumer.Errors():
> > fmt.Println(err)
> > case msg := <-consumer.Messages():
> > msgCount++
> > fmt.Println(msg.Timestamp)
> > fmt.Println("Received messages", string(msg.Key), string(msg.Value))
> > case <-signals:
> > fmt.Println("Interrupt is detected")
> > break
> > }
> > }
> > }()
> > <-signals
> > }
> >
> >
> > On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com>
> > wrote:
> >
> > > Hey Craig,
> > >
> > > what exact problem you have with Sarama client?
> > >
> > > On Mon, Jul 23, 2018 at 5:11 PM Craig Ching 
> > wrote:
> > >
> > > > Hi!
> > > >
> > > > I'm working on debugging a problem with how message timestamps are
> > > handled
> > > > in the sarama client.  In some cases, the sarama client won't
> > associate a
> > > > timestamp with a message while the kafka console consumer does.  I've
> > > found
> > > > the documentation on the message format here:
> > > >
> > > > https://kafka.apache.org/documentation/#messageformat
> > > >
> > > > But the information there is very sparse.  For instance, what are
> > > > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm
> debugging
> > > > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be
> the
> > > > timestamp I want.  Is there some state about the message that I need
> to
> > > > understand in order to have maxTimestamp be used?  Any further
> > > > documentation or guidance on this would be very helpful!
> > > >
> > > > On another note, I am trying to debug this through the scala/java
> > console
> > > > consumer, but I'm having a hard time getting IntelliJ setup.  Is
> there
> > > > anything special or documentation I need to set this up for
> debugging?
> > > >
> > >
> >
>


Re: Debugging message timestamps in Sarama

2018-07-23 Thread Craig Ching
Hi Dmitry,

Are you associated with the Sarama project?  If so, understand that part of
what I want is to learn about Sarama and the Kafka message format ;)

The problem I'm having is that if I turn on:

log.message.timestamp.type=LogAppendTime

in the broker, then produce on topic1 with console producer, I will see
timestamps in the sarama client.  If I produce on topic2 with telegraf
(incidentally, I think telegraf is a sarama producer), then I don't see
timestamps in the sarama client.  In both cases, if I consume using the
console consumer (with --property print.timestamp=true) I *do* see
timestamps.

I'm happy to debug this issue myself and submit a PR to sarama, but I am
missing some fundamentals of how to decode the kafka message format and
would really like some pointers.

Cheers,
Craig

P.S.  Here is the sarama code I'm using to test:

package main

import (
"fmt"
"log"
"os"
"os/signal"
"time"

"github.com/Shopify/sarama"
)

func main() {

// Initialize Sarama logging
sarama.Logger = log.New(os.Stdout, "[Sarama] ",
log.Ldate|log.Lmicroseconds|log.Lshortfile)

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.ClientID = "consumer-test"
config.Metadata.RefreshFrequency = time.Duration(5) * time.Minute
config.Metadata.Full = true
// config.Version = sarama.V0_10_0_0
config.Version = sarama.V1_1_0_0
// config.Version = sarama.V0_10_2_1
config.Consumer.Offsets.Initial = sarama.OffsetOldest

brokers := []string{"localhost:9092"}
// brokers :=
[]string{"measurement-kafka-broker.service.tgt-pe-prod-ttc.consul.c-prod.ost.cloud.target.internal:9092"}

client, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}

// topic := "topic1"
topic := "topic2"
// topic := "metric-influx-measurement"
// How to decide partition, is it fixed value...?
consumer, err := client.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := client.Close(); err != nil {
panic(err)
}
}()

// Count how many message processed
msgCount := 0

go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
msgCount++
fmt.Println(msg.Timestamp)
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
fmt.Println("Interrupt is detected")
break
}
}
}()
<-signals
}


On Mon, Jul 23, 2018 at 10:43 AM Dmitriy Vsekhvalnov 
wrote:

> Hey Craig,
>
> what exact problem you have with Sarama client?
>
> On Mon, Jul 23, 2018 at 5:11 PM Craig Ching  wrote:
>
> > Hi!
> >
> > I'm working on debugging a problem with how message timestamps are
> handled
> > in the sarama client.  In some cases, the sarama client won't associate a
> > timestamp with a message while the kafka console consumer does.  I've
> found
> > the documentation on the message format here:
> >
> > https://kafka.apache.org/documentation/#messageformat
> >
> > But the information there is very sparse.  For instance, what are
> > 'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
> > sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
> > timestamp I want.  Is there some state about the message that I need to
> > understand in order to have maxTimestamp be used?  Any further
> > documentation or guidance on this would be very helpful!
> >
> > On another note, I am trying to debug this through the scala/java console
> > consumer, but I'm having a hard time getting IntelliJ setup.  Is there
> > anything special or documentation I need to set this up for debugging?
> >
>


Debugging message timestamps in Sarama

2018-07-23 Thread Craig Ching
Hi!

I'm working on debugging a problem with how message timestamps are handled
in the sarama client.  In some cases, the sarama client won't associate a
timestamp with a message while the kafka console consumer does.  I've found
the documentation on the message format here:

https://kafka.apache.org/documentation/#messageformat

But the information there is very sparse.  For instance, what are
'firstTimestamp' and 'maxTimestamp'?  It seems that when I'm debugging
sarama, firstTimestamp is set to -1 and maxTimestamp appears to be the
timestamp I want.  Is there some state about the message that I need to
understand in order to have maxTimestamp be used?  Any further
documentation or guidance on this would be very helpful!

On another note, I am trying to debug this through the scala/java console
consumer, but I'm having a hard time getting IntelliJ setup.  Is there
anything special or documentation I need to set this up for debugging?


Re: Problem consuming from broker 1.1.0

2018-07-21 Thread Craig Ching
Hi Senthil,

We never did get to the root cause unfortunately.  Our kafka cluster is
currently owned by a different team and it wasn't a priority for them to
figure that out, or at least communicate the problem to us :(

Cheers,
Craig

On Thu, Jul 19, 2018 at 8:13 AM SenthilKumar K 
wrote:

> Hi Craig Ching,
>
> Reg. *We did end up turning on debug logs for the console consumer and
> found that one broker seemed to be having problems, it would lead to
> timeouts communicating with it.  After restarting that broker, things
> sorted themselves out.*
>
> We had similar problem on prod cluster and i'm trying to figure out the
> root cause for why broker stopped responding ? Pls check my email subject :
> *"**Kafka Broker Not Responding" *where i described problem in detail.
>
> Curious to know , Were you able to figure out reason for broker failure ?.
> Of Course , turning off/on is not the ideal solution.
>
> --Senthil
>
>
> On Thu, Jun 14, 2018 at 9:48 AM, Craig Ching  wrote:
>
> > Hi Manikumar!
> >
> > Thanks for responding!  Sorry it took me so long to get back!
> >
> > We did end up turning on debug logs for the console consumer and found
> > that one broker seemed to be having problems, it would lead to timeouts
> > communicating with it.  After restarting that broker, things sorted
> > themselves out. However, I always hate the “turn off/ turn on” solution
> ;)
> > It’s interesting to me that the 1.1.0 consumer, though it reported
> timeouts
> > in the logs, never had a problem, it seemed able to recover.  Whereas the
> > 1.0.1 consumer (talking to a 1.1.0 cluster remember) couldn’t recover.
> > Does any of this make sense?  I’m happy to provide more details and logs
> if
> > necessary as I’d like to understand the root problem here.
> >
> > Thanks again!
> >
> > Cheers,
> > Craig
> >
> > > On Jun 13, 2018, at 12:43 AM, Manikumar 
> > wrote:
> > >
> > > Can you post consumer debug logs?
> > > You can enable console consumer debug logs here:
> > > kafka/config/tools-log4j.properties
> > >
> > > On Wed, Jun 13, 2018 at 9:55 AM Craig Ching 
> > wrote:
> > >
> > >> Hi!
> > >>
> > >> We’re having a problem with a new kafka cluster at 1.1.0.  The problem
> > is,
> > >> in general, that consumers can’t consume from the different broker
> (old
> > >> broker was 0.11 I think).  The easiest recipe I have for reproducing
> the
> > >> problem is that downloading kafka 1.0.1 and running console consumer
> > can’t
> > >> consume from the 1.1.0 cluster while a 1.1.0 console consumer can.
> > We’re
> > >> invoking console consumer like this:
> > >>
> > >> bin/kafka-console-consumer.sh \
> > >>—bootstrap-server [kafka server] \
> > >>—topic [our topic] \
> > >>—max-messages 3
> > >>
> > >> That works for a 1.1.0 console consumer, but not for 1.0.1.  However,
> if
> > >> we change that to:
> > >>
> > >> bin/kafka-console-consumer.sh \
> > >>—zookeeper [zookeeper server] \
> > >>—topic [our topic] \
> > >>—max-messages 3
> > >>
> > >> Then it works for 1.0.1.
> > >>
> > >> I was wondering, is the zookeeper schema published for 1.1.0?  I have
> a
> > >> feeling that maybe something is wrong in zookeeper and I know earlier
> > >> versions of kafka used to publish the zk schema, could there be a
> > problem
> > >> in zk?  if so, what might I look for?
> > >>
> > >> Any help is greatly appreciated!
> > >>
> > >> Cheers,
> > >> Craig
> >
> >
>


Re: Problem consuming from broker 1.1.0

2018-06-13 Thread Craig Ching
Hi Manikumar!

Thanks for responding!  Sorry it took me so long to get back!

We did end up turning on debug logs for the console consumer and found that one 
broker seemed to be having problems, it would lead to timeouts communicating 
with it.  After restarting that broker, things sorted themselves out. However, 
I always hate the “turn off/ turn on” solution ;)  It’s interesting to me that 
the 1.1.0 consumer, though it reported timeouts in the logs, never had a 
problem, it seemed able to recover.  Whereas the 1.0.1 consumer (talking to a 
1.1.0 cluster remember) couldn’t recover.  Does any of this make sense?  I’m 
happy to provide more details and logs if necessary as I’d like to understand 
the root problem here.

Thanks again!

Cheers,
Craig

> On Jun 13, 2018, at 12:43 AM, Manikumar  wrote:
> 
> Can you post consumer debug logs?
> You can enable console consumer debug logs here:
> kafka/config/tools-log4j.properties
> 
> On Wed, Jun 13, 2018 at 9:55 AM Craig Ching  wrote:
> 
>> Hi!
>> 
>> We’re having a problem with a new kafka cluster at 1.1.0.  The problem is,
>> in general, that consumers can’t consume from the different broker (old
>> broker was 0.11 I think).  The easiest recipe I have for reproducing the
>> problem is that downloading kafka 1.0.1 and running console consumer can’t
>> consume from the 1.1.0 cluster while a 1.1.0 console consumer can.  We’re
>> invoking console consumer like this:
>> 
>> bin/kafka-console-consumer.sh \
>>—bootstrap-server [kafka server] \
>>—topic [our topic] \
>>—max-messages 3
>> 
>> That works for a 1.1.0 console consumer, but not for 1.0.1.  However, if
>> we change that to:
>> 
>> bin/kafka-console-consumer.sh \
>>—zookeeper [zookeeper server] \
>>—topic [our topic] \
>>—max-messages 3
>> 
>> Then it works for 1.0.1.
>> 
>> I was wondering, is the zookeeper schema published for 1.1.0?  I have a
>> feeling that maybe something is wrong in zookeeper and I know earlier
>> versions of kafka used to publish the zk schema, could there be a problem
>> in zk?  if so, what might I look for?
>> 
>> Any help is greatly appreciated!
>> 
>> Cheers,
>> Craig



High CPU Usage on 0.8.2.1

2015-07-16 Thread Craig Ching
Hi all,

We're experiencing high cpu usage on some of our development machines when
kafka should be idle.  These are single node kafka instances used only for
development, so far we haven't seen any problems on our production system.

It doesn't seem to happen until we deploy a storm topology and the more
topologies we deploy, the worse it gets.  And the higher the partitions for
the topics, the worse it gets.

For instance, on my system if I change the number of partitions from 3 to 1
for ~10 topics, I experience a decrease in CPU usage from ~89% to ~50%

I'll admit, I'm still getting up to speed on kafka, so maybe having 3
partitions on our development machines is crazy, but, still, even 1
partition causes this excessive CPU.

I have profiled using jvisualvm and the hotspot seems to be
RequestChannel.receiveRequest().  I've tinkered with the code a bit, but
haven't been able to affect any change.

Does anyone have any experience with such a scenario?  Is there anything I
can provide that would help troubleshoot this?

I appreciate any help!

Cheers,
Craig


Re: High CPU Usage on 0.8.2.1

2015-07-16 Thread Craig Ching
So I found the culprit.  On the storm-kafka spout there is a setting:

topology.sleep.spout.wait.strategy.time.ms

this setting defaults to 1ms.  Changing it dramatically decreases CPU in
kafka and storm.  Does anyone here know anything about this?  Should this
be able to peg Kafka's CPU usage?  I'm also going to ask on the storm list
about this setting.

Cheers,
Craig

On Thu, Jul 16, 2015 at 8:41 AM, Craig Ching craigch...@gmail.com wrote:

 Hi all,

 We're experiencing high cpu usage on some of our development machines when
 kafka should be idle.  These are single node kafka instances used only for
 development, so far we haven't seen any problems on our production system.

 It doesn't seem to happen until we deploy a storm topology and the more
 topologies we deploy, the worse it gets.  And the higher the partitions for
 the topics, the worse it gets.

 For instance, on my system if I change the number of partitions from 3 to
 1 for ~10 topics, I experience a decrease in CPU usage from ~89% to ~50%

 I'll admit, I'm still getting up to speed on kafka, so maybe having 3
 partitions on our development machines is crazy, but, still, even 1
 partition causes this excessive CPU.

 I have profiled using jvisualvm and the hotspot seems to be
 RequestChannel.receiveRequest().  I've tinkered with the code a bit, but
 haven't been able to affect any change.

 Does anyone have any experience with such a scenario?  Is there anything I
 can provide that would help troubleshoot this?

 I appreciate any help!

 Cheers,
 Craig