Re: Broker Exceptions

2015-03-06 Thread Zakee
Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader 
election for partitions” in logs. I also see lot of Produce request failure 
warnings in with the NotLeader Exception. 

I tried switching off the auto.leader.relabalance to false. I am still noticing 
the rebalance happening. My understanding was the rebalance will not happen 
when this is set to false.  

Thanks
Zakee



 On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote:
 
 I don’t think num.replica.fetchers will help in this case. Increasing
 number of fetcher threads will only help in cases where you have a large
 amount of data coming into a broker and more replica fetcher threads will
 help keep up. We usually only use 1-2 for each broker. But in your case,
 it looks that leader migration cause issue.
 Do you see anything else in the log? Like preferred leader election?
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net 
 mailto:kzak...@netzero.net wrote:
 
 Thanks, Jiangjie.
 
 Yes, I do see under partitions usually shooting every hour. Anythings that
 I could try to reduce it?
 
 How does num.replica.fetchers affect the replica sync? Currently have
 configured 7 each of 5 brokers.
 
 -Zakee
 
 On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
 These messages are usually caused by leader migration. I think as long
 as
 you don¹t see this lasting for ever and got a bunch of under replicated
 partitions, it should be fine.
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote:
 
 Need to know if I should I be worried about this or ignore them.
 
 I see tons of these exceptions/warnings in the broker logs, not sure
 what
 causes them and what could be done to fix them.
 
 ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to
 broker
 5:class kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for
 partition [TestTopic] to broker 5:class
 kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch
 request
 with correlation id 950084 from client ReplicaFetcherThread-1-2 on
 partition [TestTopic,2] failed due to Leader not local for partition
 [TestTopic,2] on broker 2 (kafka.server.ReplicaManager)
 
 
 Any ideas?
 
 -Zakee
 
 Next Apple Sensation
 1 little-known path to big profits
 
 http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03vuc
 
 
 Extended Stay America
 Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free WIFI
 http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02duc
 
 
 
 
 Extended Stay America
 Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed.
 http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc 
 http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc


Re: Broker Exceptions

2015-03-06 Thread Jiangjie Qin
Yes, the rebalance should not happen in that case. That is a little bit
strange. Could you try to launch a clean Kafka cluster with
auto.leader.election disabled and try push data?
When leader migration occurs, NotLeaderForPartition exception is expected.

Jiangjie (Becket) Qin


On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote:

Yes, Jiangjie, I do see lots of these errors Starting preferred replica
leader election for partitions” in logs. I also see lot of Produce
request failure warnings in with the NotLeader Exception.

I tried switching off the auto.leader.relabalance to false. I am still
noticing the rebalance happening. My understanding was the rebalance will
not happen when this is set to false.

Thanks
Zakee



 On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID
wrote:
 
 I don’t think num.replica.fetchers will help in this case. Increasing
 number of fetcher threads will only help in cases where you have a large
 amount of data coming into a broker and more replica fetcher threads
will
 help keep up. We usually only use 1-2 for each broker. But in your case,
 it looks that leader migration cause issue.
 Do you see anything else in the log? Like preferred leader election?
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net
mailto:kzak...@netzero.net wrote:
 
 Thanks, Jiangjie.
 
 Yes, I do see under partitions usually shooting every hour. Anythings
that
 I could try to reduce it?
 
 How does num.replica.fetchers affect the replica sync? Currently have
 configured 7 each of 5 brokers.
 
 -Zakee
 
 On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
 These messages are usually caused by leader migration. I think as long
 as
 you don¹t see this lasting for ever and got a bunch of under
replicated
 partitions, it should be fine.
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote:
 
 Need to know if I should I be worried about this or ignore them.
 
 I see tons of these exceptions/warnings in the broker logs, not sure
 what
 causes them and what could be done to fix them.
 
 ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to
 broker
 5:class kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for
 partition [TestTopic] to broker 5:class
 kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch
 request
 with correlation id 950084 from client ReplicaFetcherThread-1-2 on
 partition [TestTopic,2] failed due to Leader not local for partition
 [TestTopic,2] on broker 2 (kafka.server.ReplicaManager)
 
 
 Any ideas?
 
 -Zakee
 
 Next Apple Sensation
 1 little-known path to big profits
 
 
http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03v
uc
 
 
 Extended Stay America
 Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free
WIFI
 
http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02du
c
 
 
 
 
 Extended Stay America
 Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed.
 http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc
http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc



Kafka to Hadoop HDFS

2015-03-06 Thread Lin Ma
Hi Kafka masters,

Wondering if any open source solutions, to transfer message received from
Kakfa to Hadoop HDFS? Thanks.

regards,
Lin


Re: Broker Exceptions

2015-03-06 Thread Zakee
Thanks, Jiangjie, I will try with a clean cluster again.

Thanks
Zakee



 On Mar 6, 2015, at 3:51 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote:
 
 Yes, the rebalance should not happen in that case. That is a little bit
 strange. Could you try to launch a clean Kafka cluster with
 auto.leader.election disabled and try push data?
 When leader migration occurs, NotLeaderForPartition exception is expected.
 
 Jiangjie (Becket) Qin
 
 
 On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote:
 
 Yes, Jiangjie, I do see lots of these errors Starting preferred replica
 leader election for partitions” in logs. I also see lot of Produce
 request failure warnings in with the NotLeader Exception.
 
 I tried switching off the auto.leader.relabalance to false. I am still
 noticing the rebalance happening. My understanding was the rebalance will
 not happen when this is set to false.
 
 Thanks
 Zakee
 
 
 
 On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID
 wrote:
 
 I don’t think num.replica.fetchers will help in this case. Increasing
 number of fetcher threads will only help in cases where you have a large
 amount of data coming into a broker and more replica fetcher threads
 will
 help keep up. We usually only use 1-2 for each broker. But in your case,
 it looks that leader migration cause issue.
 Do you see anything else in the log? Like preferred leader election?
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net
 mailto:kzak...@netzero.net wrote:
 
 Thanks, Jiangjie.
 
 Yes, I do see under partitions usually shooting every hour. Anythings
 that
 I could try to reduce it?
 
 How does num.replica.fetchers affect the replica sync? Currently have
 configured 7 each of 5 brokers.
 
 -Zakee
 
 On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin
 j...@linkedin.com.invalid
 wrote:
 
 These messages are usually caused by leader migration. I think as long
 as
 you don¹t see this lasting for ever and got a bunch of under
 replicated
 partitions, it should be fine.
 
 Jiangjie (Becket) Qin
 
 On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote:
 
 Need to know if I should I be worried about this or ignore them.
 
 I see tons of these exceptions/warnings in the broker logs, not sure
 what
 causes them and what could be done to fix them.
 
 ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to
 broker
 5:class kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for
 partition [TestTopic] to broker 5:class
 kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)
 [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch
 request
 with correlation id 950084 from client ReplicaFetcherThread-1-2 on
 partition [TestTopic,2] failed due to Leader not local for partition
 [TestTopic,2] on broker 2 (kafka.server.ReplicaManager)
 
 
 Any ideas?
 
 -Zakee
 
 Next Apple Sensation
 1 little-known path to big profits
 
 
 http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03v
 uc
 
 
 Extended Stay America
 Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free
 WIFI
 
 http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02du
 c
 
 
 
 
 Extended Stay America
 Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed.
 http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc
 http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc
 
 
 
 The WORST exercise for aging
 Avoid this #34;healthy#34; exercise to look  feel 5-10 years YOUNGER
 http://thirdpartyoffers.netzero.net/TGL3255/54fa40e98a0e640e81196mp07duc



Re: Possible to count for unclosed resources in process

2015-03-06 Thread Otis Gospodnetic
Hi,

I think you can look at open file descriptors (network connections use
FDs).  For example:

https://apps.sematext.com/spm-reports/s/IoQDvdT0Ig -- all good
https://apps.sematext.com/spm-reports/s/v5Hvwta7PP -- Otis restarting 2
consumers

lsof probably shows it, too.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


On Fri, Mar 6, 2015 at 4:04 PM, Gwen Shapira gshap...@cloudera.com wrote:

 It doesn't keep track specifically, but there are open sockets that may
 take a while to clean themselves up.

 Note that if you use the async producer and don't close the producer
 nicely, you may miss messages as the connection will close before all
 messages are sent. Guess how we found out? :)

 Similar for consumer, if you use high level consumer and don't close the
 consumer nicely, you may not acknowledge the last messages and they will be
 re-read next time the consumer starts, leading to duplicates.

 Gwen



 On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com
 wrote:

  One of our staff has has been terrible at adding finally clauses to
  close kafka resources.
 
  Does the kafka scala/Java client maintain a count or list of open
  producers/consumers/client connections?
 



Re: TopicFilters and 0.9 Consumer

2015-03-06 Thread Vinoth Chandar
Hi Guozhang,

Thanks for confirming.

It should be straightforward to make subscribe(TopicFilter) and
subscribe(TopicFilter, Partition) work for added/removed topics, since this
is mostly regex matching against zookeeper metadata. But any thoughts on
how repartitioning would work?  (we need to let the consumers know there
are more partitions now, so they can spin up more consumers for these
partitions). Is there a writeup of sorts somewhere, hinting at what to
expect?

Thanks
Vinoth

On Thu, Mar 5, 2015 at 8:56 AM, Guozhang Wang wangg...@gmail.com wrote:

 Vinoth,

 Yes we do have plans to continue supporting topic filters in 0.9 consumers,
 the APIs are not there yet though.

 Guozhang

 On Thu, Mar 5, 2015 at 8:32 AM, Vinoth Chandar vin...@uber.com wrote:

  Hi guys,
 
  I was wondering what the plan in 0.9, was for the topic filters that are
  today in the High level consumer. The new API'
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
  s
  subscribe methods, seem to be working with real topic names (which I
  personally like).
 
  Given, the topic filters today support automatic topic discovery (e.g.:
  MirrorMaker), would there be another mechanism to notify consumers of new
  topics and change in partitions? (I was thinking about an internal topic
  that provided these system changes, that consumers can listen to and
 choose
  whether or not they want to subscribe to say a newly created topic).
 
  Thanks
  Vinoth
 



 --
 -- Guozhang



Re: JMS to Kafka: Inbuilt JMSAdaptor/JMSProxy/JMSBridge (Client can speak JMS but hit Kafka)

2015-03-06 Thread Jay Kreps
I think this is great. I assume the form this would take would be a library
that implements the JMS api that wraps the existing java producer and
consumer?

Our past experience has been that trying to maintain all this stuff
centrally is too hard and tends to stifle rather than support innovation.
So if you are interested in doing this I would recommend doing a small
github project. We will definitely help promote it. Several people have
asked for it so I suspect you would definitely get some usage. I would also
love to hear how well that adaption works in practice--i.e. what percentage
of JMS features are supportable by Kafka.

-Jay

On Thu, Mar 5, 2015 at 6:30 PM, Joshi, Rekha rekha_jo...@intuit.com wrote:

 Hi,

 Kafka is a great alternative to JMS, providing high performance,
 throughput as scalable, distributed pub sub/commit log service.

 However there always exist traditional systems running on JMS.
 Rather than rewriting, it would be great if we just had an inbuilt
 JMSAdaptor/JMSProxy/JMSBridge by which client can speak JMS but hit Kafka
 behind-the-scene.
 Something like Chukwa's
 o.a.h.chukwa.datacollection.adaptor.jms.JMSAdaptor, which receives msg off
 JMS queue and transforms to a Chukwa chunk?

 I have come across folks talking of this need in past as well.Is it
 considered and/or part of the roadmap?
 http://grokbase.com/t/kafka/users/131cst8xpv/stomp-binding-for-kafka

 http://grokbase.com/t/kafka/users/148dm4247q/consuming-messages-from-kafka-and-pushing-on-to-a-jms-queue

 http://grokbase.com/t/kafka/users/143hjepbn2/request-kafka-zookeeper-jms-details

 Looking for inputs on correct way to approach this so to retain all good
 features of Kafka while still not rewriting entire application.Possible?

 Thanks
 Rekha



Re: How does num.consumer.fetchers get used

2015-03-06 Thread Jiangjie Qin
Hi Tao,

Yes, your understanding is correct. We probably should update the document
to make it more clear. Could you open a ticket for it?

Jiangjie (Becket) Qin

On 3/6/15, 1:23 AM, tao xiao xiaotao...@gmail.com wrote:

Hi team,

After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that  the number of
fetcher threads is controlled by the value of
 property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
are
4 fetcher threads in total created after consumer is initialized.

But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager

private def getFetcherId(topic: String, partitionId: Int) : Int = {

Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

  }


def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {

mapLock synchronized {

  val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =

BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

  for ((brokerAndFetcherId, partitionAndOffsets) -
partitionsPerFetcher) {

var fetcherThread: AbstractFetcherThread = null

fetcherThreadMap.get(brokerAndFetcherId) match {

  case Some(f) = fetcherThread = f

  case None =

fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId,
brokerAndFetcherId.broker)

fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

fetcherThread.start

}



fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =

  topicAndPartition - brokerAndInitOffset.initOffset

})

  }

}

 If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

Is my assumption correct?

-- 
Regards,
Tao



How does num.consumer.fetchers get used

2015-03-06 Thread tao xiao
Hi team,

After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that  the number of
fetcher threads is controlled by the value of
 property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.

But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager

private def getFetcherId(topic: String, partitionId: Int) : Int = {

Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

  }


def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {

mapLock synchronized {

  val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =

BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

  for ((brokerAndFetcherId, partitionAndOffsets) -
partitionsPerFetcher) {

var fetcherThread: AbstractFetcherThread = null

fetcherThreadMap.get(brokerAndFetcherId) match {

  case Some(f) = fetcherThread = f

  case None =

fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)

fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

fetcherThread.start

}



fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =

  topicAndPartition - brokerAndInitOffset.initOffset

})

  }

}

 If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

Is my assumption correct?

-- 
Regards,
Tao


Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
Hi team,

I am having java.util.IllegalFormatConversionException when running
MirrorMaker with log level set to trace. The code is off latest trunk with
commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

The way I bring up is

bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
--producer.config
~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
--num.streams 1 --num.producers 1 --no.data.loss --whitelist
mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size
1024
and set the log level to trace in tools-log4j.properties

here is the log snippet

[2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,211] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
value=[B@434c4f70 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
value=[B@21d8d293 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
value=[B@1acd590b with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
value=[B@bd3671 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
message for topic-partition mm-benchmark-test-0:
(org.apache.kafka.clients.producer.internals.RecordBatch)

java.util.IllegalFormatConversionException: d !=
kafka.tools.MirrorMaker$UnackedOffset

at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

at java.util.Formatter.format(Formatter.java:2488)

at java.util.Formatter.format(Formatter.java:2423)

at java.lang.String.format(String.java:2790)

at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

at scala.collection.immutable.StringOps.format(StringOps.scala:31)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

at kafka.utils.Logging$class.trace(Logging.scala:36)

at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

at
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

at
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

at java.lang.Thread.run(Thread.java:745)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



-- 
Regards,
Tao


Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
A bit more context: I turned on async in producer.properties

On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I am having java.util.IllegalFormatConversionException when running
 MirrorMaker with log level set to trace. The code is off latest trunk with
 commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

 The way I bring up is

 bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
 --producer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
 --num.streams 1 --num.producers 1 --no.data.loss --whitelist
 mm-benchmark-test\\w* --offset.commit.interval.ms 1
 --queue.byte.size 1024
 and set the log level to trace in tools-log4j.properties

 here is the log snippet

 [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,211] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
 value=[B@434c4f70 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
 value=[B@21d8d293 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
 value=[B@1acd590b with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
 value=[B@bd3671 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
 message for topic-partition mm-benchmark-test-0:
 (org.apache.kafka.clients.producer.internals.RecordBatch)

 java.util.IllegalFormatConversionException: d !=
 kafka.tools.MirrorMaker$UnackedOffset

 at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

 at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

 at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

 at java.util.Formatter.format(Formatter.java:2488)

 at java.util.Formatter.format(Formatter.java:2423)

 at java.lang.String.format(String.java:2790)

 at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

 at scala.collection.immutable.StringOps.format(StringOps.scala:31)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at kafka.utils.Logging$class.trace(Logging.scala:36)

 at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

 at
 org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

 at
 org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

 at
 org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

 at
 org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

 at
 org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

 at java.lang.Thread.run(Thread.java:745)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: TopicFilters and 0.9 Consumer

2015-03-06 Thread Guozhang Wang
1. partition / member changes are caught on the server side, who will
notify consumers to re-balance.
2. topic changes are caught on the client side through metadata request,
who will then re-join the group with the new topic list to the server to
re-balance.

Guozhang

On Fri, Mar 6, 2015 at 8:49 AM, Vinoth Chandar vin...@uber.com wrote:

 Hi Guozhang,

 Thanks for confirming.

 It should be straightforward to make subscribe(TopicFilter) and
 subscribe(TopicFilter, Partition) work for added/removed topics, since this
 is mostly regex matching against zookeeper metadata. But any thoughts on
 how repartitioning would work?  (we need to let the consumers know there
 are more partitions now, so they can spin up more consumers for these
 partitions). Is there a writeup of sorts somewhere, hinting at what to
 expect?

 Thanks
 Vinoth

 On Thu, Mar 5, 2015 at 8:56 AM, Guozhang Wang wangg...@gmail.com wrote:

  Vinoth,
 
  Yes we do have plans to continue supporting topic filters in 0.9
 consumers,
  the APIs are not there yet though.
 
  Guozhang
 
  On Thu, Mar 5, 2015 at 8:32 AM, Vinoth Chandar vin...@uber.com wrote:
 
   Hi guys,
  
   I was wondering what the plan in 0.9, was for the topic filters that
 are
   today in the High level consumer. The new API'
   
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
   s
   subscribe methods, seem to be working with real topic names (which I
   personally like).
  
   Given, the topic filters today support automatic topic discovery (e.g.:
   MirrorMaker), would there be another mechanism to notify consumers of
 new
   topics and change in partitions? (I was thinking about an internal
 topic
   that provided these system changes, that consumers can listen to and
  choose
   whether or not they want to subscribe to say a newly created topic).
  
   Thanks
   Vinoth
  
 
 
 
  --
  -- Guozhang
 




-- 
-- Guozhang


Possible to count for unclosed resources in process

2015-03-06 Thread Stuart Reynolds
One of our staff has has been terrible at adding finally clauses to
close kafka resources.

Does the kafka scala/Java client maintain a count or list of open
producers/consumers/client connections?


Re: Possible to count for unclosed resources in process

2015-03-06 Thread Gwen Shapira
It doesn't keep track specifically, but there are open sockets that may
take a while to clean themselves up.

Note that if you use the async producer and don't close the producer
nicely, you may miss messages as the connection will close before all
messages are sent. Guess how we found out? :)

Similar for consumer, if you use high level consumer and don't close the
consumer nicely, you may not acknowledge the last messages and they will be
re-read next time the consumer starts, leading to duplicates.

Gwen



On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com
wrote:

 One of our staff has has been terrible at adding finally clauses to
 close kafka resources.

 Does the kafka scala/Java client maintain a count or list of open
 producers/consumers/client connections?



Re: Possible to count for unclosed resources in process

2015-03-06 Thread Ewen Cheslack-Postava
You could also take a thread dump to try to find them by their network
threads. For example this is how new producer network threads are named:

String ioThreadName = kafka-producer-network-thread +
(clientId.length()  0 ?  |  + clientId : );



On Fri, Mar 6, 2015 at 1:04 PM, Gwen Shapira gshap...@cloudera.com wrote:

 It doesn't keep track specifically, but there are open sockets that may
 take a while to clean themselves up.

 Note that if you use the async producer and don't close the producer
 nicely, you may miss messages as the connection will close before all
 messages are sent. Guess how we found out? :)

 Similar for consumer, if you use high level consumer and don't close the
 consumer nicely, you may not acknowledge the last messages and they will be
 re-read next time the consumer starts, leading to duplicates.

 Gwen



 On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com
 wrote:

  One of our staff has has been terrible at adding finally clauses to
  close kafka resources.
 
  Does the kafka scala/Java client maintain a count or list of open
  producers/consumers/client connections?
 




-- 
Thanks,
Ewen


Re: JMS to Kafka: Inbuilt JMSAdaptor/JMSProxy/JMSBridge (Client can speak JMS but hit Kafka)

2015-03-06 Thread Joshi, Rekha
Thank you Jay for your note.

So JMSAdaptor(or maybe, MQKafkaBridge?) prototype, it is then! Will run a
feature compatibility feasibility check.

 
Thanks
Rekha



On 3/6/15, 8:45 AM, Jay Kreps jay.kr...@gmail.com wrote:

I think this is great. I assume the form this would take would be a
library
that implements the JMS api that wraps the existing java producer and
consumer?

Our past experience has been that trying to maintain all this stuff
centrally is too hard and tends to stifle rather than support innovation.
So if you are interested in doing this I would recommend doing a small
github project. We will definitely help promote it. Several people have
asked for it so I suspect you would definitely get some usage. I would
also
love to hear how well that adaption works in practice--i.e. what
percentage
of JMS features are supportable by Kafka.

-Jay

On Thu, Mar 5, 2015 at 6:30 PM, Joshi, Rekha rekha_jo...@intuit.com
wrote:

 Hi,

 Kafka is a great alternative to JMS, providing high performance,
 throughput as scalable, distributed pub sub/commit log service.

 However there always exist traditional systems running on JMS.
 Rather than rewriting, it would be great if we just had an inbuilt
 JMSAdaptor/JMSProxy/JMSBridge by which client can speak JMS but hit
Kafka
 behind-the-scene.
 Something like Chukwa's
 o.a.h.chukwa.datacollection.adaptor.jms.JMSAdaptor, which receives msg
off
 JMS queue and transforms to a Chukwa chunk?

 I have come across folks talking of this need in past as well.Is it
 considered and/or part of the roadmap?
 http://grokbase.com/t/kafka/users/131cst8xpv/stomp-binding-for-kafka

 
http://grokbase.com/t/kafka/users/148dm4247q/consuming-messages-from-kafk
a-and-pushing-on-to-a-jms-queue

 
http://grokbase.com/t/kafka/users/143hjepbn2/request-kafka-zookeeper-jms-
details

 Looking for inputs on correct way to approach this so to retain all good
 features of Kafka while still not rewriting entire application.Possible?

 Thanks
 Rekha




Re: Which node do you send data to?

2015-03-06 Thread Ewen Cheslack-Postava
Spencer,

Kafka (and it's clients) handle failover automatically for you. When you
create a topic, you can select a replication factor. For a replication
factor n, each partition of the topic will be replicated to n different
brokers. At any given time, one of those brokers is considered the leader
for that topic and that is the only server you can communicate with to
produce new messages. That broker will then make sure the data is copied to
the other replicas. If that leader fails, one of the replicas will take
over and the producer will have to send data to that node.

But all of this should happen automatically. As long as you set the
bootstrap.servers setting (for the new producer) so that at least one of
them is always available (ideally it should just include all the brokers),
then you shouldn't have to worry about this. Of course if a node fails you
have to deal with bringing it back up/moving it to a new machine, but
producers should continue to function normally by moving traffic to the new
leader.



On Fri, Mar 6, 2015 at 9:05 PM, Daniel Moreno d...@max2.com wrote:

 Hi Spencer,

 You can configure your producers with a list of brokers. You can add all,
 but usually at least two of the brokers in your cluster.

 Kind Regards,

 Daniel Moreno


 On Mar 6, 2015, at 23:43, Spencer Owen so...@netdocuments.commailto:
 so...@netdocuments.com wrote:

 I've setup a kafka cluster with 3 nodes.

 Which node should I push the data to? I would normally push to kafka01,
 but if that node goes down, then the entire cluster goes down.

 How have other people solved this. Maybe a nginx reverse proxy?



 http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster
 
 This message may contain information that is privileged or confidential.
 If you received this transmission in error, please notify the sender by
 reply e-mail and delete the message and any attachments.

 Warning: All email sent to this address will be received by the
 NetDocuments corporate e-mail system and is subject to archival and review
 by someone other than the recipient.

 The services to which this email (or any email in reply or to which this
 email replies) relates are provided solely by NetDocuments and NetDocuments
 has no responsibility for the services to which this email relates (or any
 email in reply or to which this email replies).




-- 
Thanks,
Ewen


Re: Which node do you send data to?

2015-03-06 Thread Daniel Moreno
Hi Spencer,

You can configure your producers with a list of brokers. You can add all, but 
usually at least two of the brokers in your cluster.

Kind Regards,

Daniel Moreno


On Mar 6, 2015, at 23:43, Spencer Owen 
so...@netdocuments.commailto:so...@netdocuments.com wrote:

I've setup a kafka cluster with 3 nodes.

Which node should I push the data to? I would normally push to kafka01, but if 
that node goes down, then the entire cluster goes down.

How have other people solved this. Maybe a nginx reverse proxy?


http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster

This message may contain information that is privileged or confidential. If you 
received this transmission in error, please notify the sender by reply e-mail 
and delete the message and any attachments.

Warning: All email sent to this address will be received by the NetDocuments 
corporate e-mail system and is subject to archival and review by someone other 
than the recipient.

The services to which this email (or any email in reply or to which this email 
replies) relates are provided solely by NetDocuments and NetDocuments has no 
responsibility for the services to which this email relates (or any email in 
reply or to which this email replies).


Which node do you send data to?

2015-03-06 Thread Spencer Owen
I've setup a kafka cluster with 3 nodes.

Which node should I push the data to? I would normally push to kafka01, but if 
that node goes down, then the entire cluster goes down.

How have other people solved this. Maybe a nginx reverse proxy?


http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster

This message may contain information that is privileged or confidential. If you 
received this transmission in error, please notify the sender by reply e-mail 
and delete the message and any attachments.

Warning: All email sent to this address will be received by the NetDocuments 
corporate e-mail system and is subject to archival and review by someone other 
than the recipient.

The services to which this email (or any email in reply or to which this email 
replies) relates are provided solely by NetDocuments and NetDocuments has no 
responsibility for the services to which this email relates (or any email in 
reply or to which this email replies).


Re: Kafka to Hadoop HDFS

2015-03-06 Thread max square
This presentation from a recent Kafka meetup in NYC describes different
approaches.
http://www.slideshare.net/gwenshap/kafka-hadoop-for-nyc-kafka-meetup?ref=http://ingest.tips/2014/10/16/notes-from-kafka-meetup/

It´s companion blog post is this:
http://ingest.tips/2014/10/16/notes-from-kafka-meetup/

Cheers

Max

On Sat, Mar 7, 2015 at 12:18 AM, Daniel Moreno d...@max2.com wrote:

 You can also try the approach described here
 http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/

 Kind Regards,

 Daniel


 On Mar 6, 2015, at 23:20, Lin Ma lin...@gmail.commailto:lin...@gmail.com
 wrote:

 Hi Kafka masters,

 Wondering if any open source solutions, to transfer message received from
 Kakfa to Hadoop HDFS? Thanks.

 regards,
 Lin



Re: Kafka to Hadoop HDFS

2015-03-06 Thread Daniel Moreno
You can also try the approach described here 
http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/

Kind Regards,

Daniel


On Mar 6, 2015, at 23:20, Lin Ma lin...@gmail.commailto:lin...@gmail.com 
wrote:

Hi Kafka masters,

Wondering if any open source solutions, to transfer message received from
Kakfa to Hadoop HDFS? Thanks.

regards,
Lin


RE: Kafka to Hadoop HDFS

2015-03-06 Thread Aditya Auradkar
Try this. https://github.com/linkedin/camus

Aditya


From: Lin Ma [lin...@gmail.com]
Sent: Friday, March 06, 2015 8:19 PM
To: users@kafka.apache.org
Subject: Kafka to Hadoop HDFS

Hi Kafka masters,

Wondering if any open source solutions, to transfer message received from
Kakfa to Hadoop HDFS? Thanks.

regards,
Lin


Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
I think I worked out the root cause

Line 593 in MirrorMaker.scala

trace(Updating offset for %s to %d.format(topicPartition, offset)) should
be

trace(Updating offset for %s to %d.format(topicPartition, offset.element))


On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote:

 A bit more context: I turned on async in producer.properties

 On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I am having java.util.IllegalFormatConversionException when running
 MirrorMaker with log level set to trace. The code is off latest trunk with
 commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

 The way I bring up is

 bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
 --producer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
 --num.streams 1 --num.producers 1 --no.data.loss --whitelist
 mm-benchmark-test\\w* --offset.commit.interval.ms 1
 --queue.byte.size 1024
 and set the log level to trace in tools-log4j.properties

 here is the log snippet

 [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,211] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
 value=[B@434c4f70 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
 value=[B@21d8d293 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
 value=[B@1acd590b with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
 value=[B@bd3671 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
 message for topic-partition mm-benchmark-test-0:
 (org.apache.kafka.clients.producer.internals.RecordBatch)

 java.util.IllegalFormatConversionException: d !=
 kafka.tools.MirrorMaker$UnackedOffset

 at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

 at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

 at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

 at java.util.Formatter.format(Formatter.java:2488)

 at java.util.Formatter.format(Formatter.java:2423)

 at java.lang.String.format(String.java:2790)

 at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

 at scala.collection.immutable.StringOps.format(StringOps.scala:31)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at kafka.utils.Logging$class.trace(Logging.scala:36)

 at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

 at
 org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

 at
 org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

 at
 org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

 at
 org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

 at
 org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

 at java.lang.Thread.run(Thread.java:745)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



 --
 Regards,
 Tao




 --
 Regards,
 Tao





Re: Database Replication Question

2015-03-06 Thread Xiao
Hi, James, 

You also mentioned you want to implement another critical component for 
monitoring the data consistency between your sources and targets and correcting 
the inconsistent data. 

Unfortunately, data compare is not easy when some applications/replications 
still change your source and target tables. It might be more complicated than 
your current replication solution. 

I spent a whole year to complete such a component. I can’t tell you how my 
solution works, but I can list a few challenges to you: 

- To avoid the influences on your application, the verification components 
should not lock your table or data for reading the table. You need to consider 
how to deal with the uncommitted transactions during the data compare. 

- The end-to-end latency is not static. Some transactions might be not 
committed for a long time. Replication latency might be also fluctuate. 

- The rows might keep changing during your compare. How to deal with these hot 
rows?

- The distance between your source and target could be very long. The compare 
volume could be very large. However, the compare speed should very fast and 
your verification component should not eat a lot of CPU and memory resources. 

Best wishes, 

Xiao Li


On Mar 5, 2015, at 11:07 AM, James Cheng jch...@tivo.com wrote:

 
 On Mar 5, 2015, at 12:59 AM, Xiao lixiao1...@gmail.com wrote:
 
 Hi, James, 
 
 This design regarding the restart point has a few potential issues, I think. 
 
 - The restart point is based on the messages that you last published. The 
 message could be pruned. How large is your log.retention.hours?
 
 That's a good point. In my case, I will be publishing this into a log 
 compacted topic. My goal is that by reading the log compacted topic from 
 beginning to end, and then continually applying new items as they come in, 
 that it will always contain the final state of the database. So messages will 
 only be pruned if they are being overwritten by later ones, and in that case, 
 the later one is the one that I now care about.
 
 - If the Kafka message order is different from your log sequence, your 
 replication might lose the data. 
 
 Good point. I think it depends on use case, and also if it's possible to 
 recover lost data.
 
 I don't think I'll be losing any messages, but it's possible that, due to 
 network delays, that the items might arrive in Kafka out of order. And in my 
 use case, that would be bad because last items wins. In the log compaction 
 case, I would be able to detect that because the last item in Kafka for a 
 particular primary key would not match the state of the item in the mysql 
 database. I could repair that item by re-publishing what is in the database, 
 out to the stream.
 
 I planned to have an auditor monitor the correctness of the Kafka topic, to 
 ensure that it reflected the state of the database. I think I will now need 
 to add a correction component, that will republish any items that are 
 incorrect.
 
 First, I think you can maintain a local persistence media for recording the 
 last published message id. 
 
 That works, but there is a small window of failure that can result in 
 duplicates.
 
 If you normally:
 1) Write to Kafka
 2) Write your last published message id somewhere.
 
 You might crash between steps 1 and 2. When you come back up, you might end 
 up re-publishing your last event.
 
 -James
 
 Second, if you can add into each message a strictly increasing dense ID in 
 the producers, you can easily recover the sequences in the consumers. If so, 
 you can have multiple producers publish the messages at the same time. This 
 could improve your throughput and your consumers can easily identify if any 
 message is lost due to any reason. 
 
 Best wishes, 
 
 Xiao Li
 
 
 On Mar 4, 2015, at 4:59 PM, James Cheng jch...@tivo.com wrote:
 
 Another thing to think about is delivery guarantees. Exactly once, at least 
 once, etc.
 
 If you have a publisher that consumes from the database log and pushes out 
 to Kafka, and then the publisher crashes, what happens when it starts back 
 up? Depending on how you keep track of the database's transaction 
 id/scn/offset, you may end up re-publishing events that you already 
 published out to the kafka topic.
 
 I am also working on database replication, namely from MySQL to Kafka. I'm 
 using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ 
 in order to get exactly once processing, so that I don't have any 
 duplicates in my kafka stream.
 
 Specifically, I have the publisher write messages to a single topic (I 
 think/hope that Kafka's throughput is high enough). I include MySQL's 
 binary log coordinates into my output messages. Upon startup, I read back 
 the end of my topic to find out what messages I published. This gives me 
 2 pieces of information:
 1) The MySQL binary log coordinates, so I know where to start again.
 2) The messages that I last published, to make sure that I don't re-publish 
 them.
 
 

Re: Database Replication Question

2015-03-06 Thread Xiao
Hi, James, 

uh… iOS Gmail app crashed. Let me resend the email to answer your concern.  

First, I am not a Kafka user. Like you, I am trying to see if Kafka can be used 
for replication-related tasks. If Kafka can provide unit of work, the design 
will be much simpler. As Guozhang said, Kafka has a plan to introduce the 
transaction concepts. Hopefully, it can be available soon.
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

Before this critical feature is available, let us discuss if database 
replication can be directly built on the current Kafka with very minor Kafka 
code changes. 

That works, but there is a small window of failure that can result in 
duplicates.
 If you normally:
 1) Write to Kafka
 2) Write your last published message id somewhere.
 You might crash between steps 1 and 2. When you come back up, you might end 
 up re-publishing your last event.

It can crash at any step and any time point. How to recover it from an 
unplanned disaster without doing a full refresh? 

1) Kafka could crash (i.e., did not gracefully stop). It could lose all the 
unflushed/non-fsynced data until the latest recovery-point. 
— I still hope Kafka can always maintain two recovery points in separate 
files. At least we can ensure one recovery point is valid if the latest one is 
corrupted during an update. 
— The consumers might already read the non-fsynced but flushed messages. 
That means, the producers should still need to republish the published 
messages. Note fileOutputStream.getFD().sync() is doing fsync and 
FileChannel.force() is doing flush.

2) The producers could crash before completing the update on “last published 
message ID”. That means, you might need to republish the last messages again.  

To me, republishing messages might be unavoidable for the current Kafka. Thus, 
the design of consumers should assume the Kafka messages might be duplicate. 
Basically, we can introduce the dense ID generated by your consumers for each 
Kafka message. Each restart, we always choose the earliest time points of. 
— The recovery points (offsets) in Kafka recovery-point file, 
— The offsets and IDs of the last message in the partitions. 
— Your local last published message IDs. 

Best wishes, 

Xiao Li

On Mar 5, 2015, at 11:07 AM, James Cheng jch...@tivo.com wrote:

 
 On Mar 5, 2015, at 12:59 AM, Xiao lixiao1...@gmail.com wrote:
 
 Hi, James, 
 
 This design regarding the restart point has a few potential issues, I think. 
 
 - The restart point is based on the messages that you last published. The 
 message could be pruned. How large is your log.retention.hours?
 
 That's a good point. In my case, I will be publishing this into a log 
 compacted topic. My goal is that by reading the log compacted topic from 
 beginning to end, and then continually applying new items as they come in, 
 that it will always contain the final state of the database. So messages will 
 only be pruned if they are being overwritten by later ones, and in that case, 
 the later one is the one that I now care about.
 
 - If the Kafka message order is different from your log sequence, your 
 replication might lose the data. 
 
 Good point. I think it depends on use case, and also if it's possible to 
 recover lost data.
 
 I don't think I'll be losing any messages, but it's possible that, due to 
 network delays, that the items might arrive in Kafka out of order. And in my 
 use case, that would be bad because last items wins. In the log compaction 
 case, I would be able to detect that because the last item in Kafka for a 
 particular primary key would not match the state of the item in the mysql 
 database. I could repair that item by re-publishing what is in the database, 
 out to the stream.
 
 I planned to have an auditor monitor the correctness of the Kafka topic, to 
 ensure that it reflected the state of the database. I think I will now need 
 to add a correction component, that will republish any items that are 
 incorrect.
 
 First, I think you can maintain a local persistence media for recording the 
 last published message id. 
 
 That works, but there is a small window of failure that can result in 
 duplicates.
 
 If you normally:
 1) Write to Kafka
 2) Write your last published message id somewhere.
 
 You might crash between steps 1 and 2. When you come back up, you might end 
 up re-publishing your last event.
 
 -James
 
 Second, if you can add into each message a strictly increasing dense ID in 
 the producers, you can easily recover the sequences in the consumers. If so, 
 you can have multiple producers publish the messages at the same time. This 
 could improve your throughput and your consumers can easily identify if any 
 message is lost due to any reason. 
 
 Best wishes, 
 
 Xiao Li
 
 
 On Mar 4, 2015, at 4:59 PM, James Cheng jch...@tivo.com wrote:
 
 Another thing to think about is delivery guarantees. Exactly once, at least 
 once, etc.
 
 If you have a