RE: Simple Consumer and offsets
If I may use the same thread to discuss the exact same issue Assuming one can store the offset in an external location (redis/db etc), along with the rest of the state that a program requires, wouldn't it be possible to manage things such that, you use the High Level API with auto commit turned off and do your custom offset management followed by the kafka commit api call (probably delayed to give a breather to zookeeper)? That way in the failure scenario, the high level consumer offset would ALWAYS be only smaller than what is actually valid and you can skip forward and avoid using the simple consumer. I assume one needs the simple consumer in the offset management use case, only we want to skip back to an older offset / use Kafka for storing offsets? I was trying to handle the customer failure scenario but avoiding the simple consumer and all the complexities it ensues. Does this work or is there anything wrong with this picture? Thanks Arun On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > We are using the High Level Consumer API to interact with Kafka for our > normal use cases. > > However, on consumer restart in the case of consumer failures, we want > to be able to manually reset offsets in certain situations. > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It > looked like instantiating a SimpleConsumer just to reset offsets on restart > was a viable option, while continuing to use the High Level Consumer for our > normal operations. Not sure if there is a better way that is compatible > across 0.8.1 and 0.8.2. > -Suren > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy > wrote: > > > Not sure what you mean by using the SimpleConsumer on failure > recovery. Can you elaborate on this? > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > We are using the High Level Consumer generally but are thinking to use the > > SimpleConsumer on failure recovery to set the offsets. > > Is that the recommended approach for this use case? > > Thanks. > > -Suren > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy > > wrote: > > > > > > Are you using it from Java or Scala? i.e., are you using the > >javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > OffsetCommitRequest/OffsetFetchRequest which means it will > > commit/fetch to/from ZooKeeper only. If you use the scala API you > > can create an OffsetCommitRequest with version set to 1 (which will > > allow you to commit to Kafka). > > > > Since we are doing an 0.8.2.1 release we will make the above more > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > even from the javaapi. I will be updating the documentation on this > > to make it clearer. > > > > Thanks, > > > > Joel > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > Joel, > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > > OffsetCommitRequest and sending that over to a broker. > > > Is the broker storing that in ZK? > > > -Suren > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > > wrote: > > > > > > > > > Hi Chris, > > > > > > In 0.8.2, the simple consumer Java API supports > > > committing/fetching offsets that are stored in ZooKeeper. You > > > don't need to issue any ConsumerMetadataRequest for this. > > > Unfortunately, the API currently does not support fetching offsets that > > > are stored in Kafka. > > > > > > Thanks, > > > > > > Joel > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > > Hi, > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > and > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, > > > > which is what I think when restarting my consumers so that they > > > > can begin working where they last left off (in the event that > > > > they were stopped for a while then restarted some time later, > > > > and new messages had come in). > > > > > > > > The fetchOffsets() works on time, usually it looks like you send > > > > it Earliest or Latest (beginning or end of what's currently in > > > > the stream). > > > > > > > > I realize the documentation says this: > > > > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > > > significant amount of work not needed in the Consumer Groups: > > > > > > > > > >1. You must keep track of the offsets in your application to know > > > > > where you left off consuming. > > > > > > > > > > But that's not really quit
About Symantec's encryption-thru-Kafka proof of concept
Hi Folks, At the recent Kafka Meetup in Mountain View there was interest expressed about the encryption through Kafka proof of concept that Symantec did a few months ago, so I have created a blog post with some details about it. You can find that here: http://goo.gl/sjYGWN Let me know if you have any thoughts or questions. Thanks, Jim -- Jim Hoagland, Ph.D. Sr. Principal Software Engineer Big Data Analytics Team Cloud Platform Engineering Symantec Corporation http://cpe.symantec.com
Re: Default MirrorMaker not copying over from source to target
Looks like you only have 4 messages in your topic and no more messages got sent 2015-02-19 20:09:34,661] DEBUG initial fetch offset of consolemm:0: fetched offset = 4: consumed offset = 4 is 4 (kafka.consumer.PartitionTopicInfo You can try sending more messages to topic or give the MM a different consumer group id and set auto.offset.reset=smallest On Friday, February 20, 2015, Alex Melville wrote: > Tao, > > > I updated the mirrorconsumer.properties config file as you suggested, and > upped the MM's log level to DEBUG. I have the output of the DEBUG logger > here in this pastebin, if you could take a minute to look for anything in > its contents that would indicate a problem that would be extremely helpful. > Note that my servers hostnames are of the form ad-010X or ba-0X where X is > some integer between 1 and 4. > > http://pastebin.com/rBsxx15A > > When I run the mirrormaker and then spin up a console consumer to read from > the source cluster, I get 0 messages consumed. > > > Alex > > On Sun, Feb 15, 2015 at 3:00 AM, tao xiao > wrote: > > > Alex, > > > > Are you sure you have data continually being sent to the topic in source > > cluster after you bring up MM? By default auto.offset.reset=largest in MM > > consumer config which means MM only fetches the largest offset if the > > consumer group has no initial offset in zookeeper. > > > > You can have MM print more log by changing the log level in > > config/tools-log4j.properties > > > > On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville > > > wrote: > > > > > Hi Kafka'ers, > > > > > > > > > I am trying to get the Mirrormaker working with two separate clusters, > > one > > > as the source and the other as the target. The topic I'm trying to copy > > > over exists on both the source and target clusters. Here are the > relevant > > > entries in my consumer and producer properties files, which I'm > > specifying > > > the command I run to start the MM: > > > > > > *mirrorconsumer.properties:* > > > zookeeper.connect=ad-0104:2181 > > > zookeeper.connection.timeout.ms=6000 > > > group.id=test-consumer-group > > > > > > > > > *mirrorproducer.properties:* > > > metadata.broker.list=ba-02:9092,ba-03:9092 > > > producer.type=sync > > > compression.codec=none > > > serializer.class=kafka.serializer.DefaultEncoder > > > > > > > > > Then I run the following command: > > > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config > > > ../config/mirrorconsumer.properties --producer.config > > > ../config/mirrorproducer.properties --whitelist consolemm > > > > > > so consolemm is the topic I'm trying to copy over. I've created > consolemm > > > and have used to console-consumer to verify that there are messages in > > the > > > topic. > > > > > > When I run this command... nothing happens. The process keeps running > and > > > prints nothing to the Terminal. If I look in the output of the > zookeeper > > on > > > the source cluster I get only the following: > > > > > > [2015-02-15 00:34:06,102] INFO Accepted socket connection from / > > > 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory) > > > [2015-02-15 00:34:06,104] INFO Client attempting to establish new > session > > > at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) > > > [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 > with > > > negotiated timeout 6000 for client /10.7.162.75:42819 > > > (org.apache.zookeeper.server.ZooKeeperServer) > > > > > > > > > and when I look at the output of one of the brokers on the source > > cluster I > > > get: > > > > > > [2015-02-15 00:32:14,382] INFO Closing socket connection to / > 10.7.162.75 > > . > > > (kafka.network.Processor) > > > > > > and there is no output on the zookeeper on the target cluster. > > > > > > > > > > > > Any advice on what is causing MM to not properly copy over data to the > > > target cluster would be extremely helpful. > > > > > > -Alex > > > > > > > > > > > -- > > Regards, > > Tao > > > -- Regards, Tao
Re: Simple Consumer and offsets
Yeah that is a good point - will do the update as part of the doc changes in KAFKA-1729 On Thu, Feb 19, 2015 at 09:26:30PM -0500, Evan Huus wrote: > On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy wrote: > > > If you are using v0 of OffsetCommit/FetchRequest then you can issue > > that to any broker. For version > 0 you will need to issue it to the > > coordinator. You can discover the coordinator by sending a > > ConsumerMetadataRequest to any broker. > > > > The protocol spec [1] still says "Currently the supported version for all > APIs is 0". Based on your message above that is no longer true, so could > somebody familiar with the changes please update the spec appropriately? > > Thanks, > Evan > > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > > On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: > > > Joel/All, > > > The SimpleConsumer constructor requires a specific host and port. > > > > > > Can this be any broker? > > > If it needs to be a specific broker, for 0.8.2, should this be the > > offset coordinator? For 0.8.1, does it matter? > > > -Suren > > > > > > > > > On Thursday, February 19, 2015 10:43 AM, Joel Koshy < > > jjkosh...@gmail.com> wrote: > > > > > > > > > I see - yes, you can use the SimpleConsumer for that. However, your > > > high-level consumers need to be shutdown while you do that (otherwise > > > they may auto-commit while you are resetting offsets). > > > > > > Thanks, > > > > > > Joel > > > > > > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > > > > We are using the High Level Consumer API to interact with Kafka for > > our normal use cases. > > > > > > > > However, on consumer restart in the case of consumer failures, we want > > to be able to manually > > > > reset offsets in certain situations. > > > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) > > > > It looked like instantiating a SimpleConsumer just to reset offsets on > > restart was a viable option, while continuing to use the High Level > > Consumer for our normal operations. Not sure if there is a better way that > > is compatible across 0.8.1 and 0.8.2. > > > > -Suren > > > > > > > > > > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy < > > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > Not sure what you mean by using the SimpleConsumer on failure > > > > recovery. Can you elaborate on this? > > > > > > > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > > > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > > > > We are using the High Level Consumer generally but are thinking to > > use the SimpleConsumer on failure recovery to set the offsets. > > > > > Is that the recommended approach for this use case? > > > > > Thanks. > > > > > -Suren > > > > > > > > > > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy < > > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > > > > Are you using it from Java or Scala? i.e., are you using the > > > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > > > > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > > > > OffsetCommitRequest/OffsetFetchRequest which means it will > > > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can > > > > > create an OffsetCommitRequest with version set to 1 (which will allow > > > > > you to commit to Kafka). > > > > > > > > > > Since we are doing an 0.8.2.1 release we will make the above more > > > > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > > > > even from the javaapi. I will be updating the documentation on this > > to > > > > > make it clearer. > > > > > > > > > > Thanks, > > > > > > > > > > Joel > > > > > > > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > > > > Joel, > > > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > OffsetCommitRequest and sending that over to a broker. > > > > > > Is the broker storing that in ZK? > > > > > > -Suren > > > > > > > > > > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy < > > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > > > > > offsets that are stored in ZooKeeper. You don't need to issue any > > > > > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > > > > > does not support fetching offsets that are stored in Kafka. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Joel > > > > > > > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott > > wrote: > > > > > > > Hi, > > > > > > > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > > > > > > > a
Re: Simple Consumer and offsets
On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy wrote: > If you are using v0 of OffsetCommit/FetchRequest then you can issue > that to any broker. For version > 0 you will need to issue it to the > coordinator. You can discover the coordinator by sending a > ConsumerMetadataRequest to any broker. > The protocol spec [1] still says "Currently the supported version for all APIs is 0". Based on your message above that is no longer true, so could somebody familiar with the changes please update the spec appropriately? Thanks, Evan [1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: > > Joel/All, > > The SimpleConsumer constructor requires a specific host and port. > > > > Can this be any broker? > > If it needs to be a specific broker, for 0.8.2, should this be the > offset coordinator? For 0.8.1, does it matter? > > -Suren > > > > > > On Thursday, February 19, 2015 10:43 AM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > I see - yes, you can use the SimpleConsumer for that. However, your > > high-level consumers need to be shutdown while you do that (otherwise > > they may auto-commit while you are resetting offsets). > > > > Thanks, > > > > Joel > > > > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > > > We are using the High Level Consumer API to interact with Kafka for > our normal use cases. > > > > > > However, on consumer restart in the case of consumer failures, we want > to be able to manually > > > reset offsets in certain situations. > > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) > > > It looked like instantiating a SimpleConsumer just to reset offsets on > restart was a viable option, while continuing to use the High Level > Consumer for our normal operations. Not sure if there is a better way that > is compatible across 0.8.1 and 0.8.2. > > > -Suren > > > > > > > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > > > > Not sure what you mean by using the SimpleConsumer on failure > > > recovery. Can you elaborate on this? > > > > > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > > > We are using the High Level Consumer generally but are thinking to > use the SimpleConsumer on failure recovery to set the offsets. > > > > Is that the recommended approach for this use case? > > > > Thanks. > > > > -Suren > > > > > > > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > Are you using it from Java or Scala? i.e., are you using the > > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > > > OffsetCommitRequest/OffsetFetchRequest which means it will > > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can > > > > create an OffsetCommitRequest with version set to 1 (which will allow > > > > you to commit to Kafka). > > > > > > > > Since we are doing an 0.8.2.1 release we will make the above more > > > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > > > even from the javaapi. I will be updating the documentation on this > to > > > > make it clearer. > > > > > > > > Thanks, > > > > > > > > Joel > > > > > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > > > Joel, > > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > OffsetCommitRequest and sending that over to a broker. > > > > > Is the broker storing that in ZK? > > > > > -Suren > > > > > > > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy < > jjkosh...@gmail.com> wrote: > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > > > > offsets that are stored in ZooKeeper. You don't need to issue any > > > > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > > > > does not support fetching offsets that are stored in Kafka. > > > > > > > > > > Thanks, > > > > > > > > > > Joel > > > > > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott > wrote: > > > > > > Hi, > > > > > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > > > > > and > > > > > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, > which is > > > > > > what I think when restarting my consumers so that they can begin > > > > > > working where they last left off (in the event that they were > stopped > > > > > > for a while then restarted some time later, and ne
Re: Simple Consumer and offsets
If you are using v0 of OffsetCommit/FetchRequest then you can issue that to any broker. For version > 0 you will need to issue it to the coordinator. You can discover the coordinator by sending a ConsumerMetadataRequest to any broker. On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: > Joel/All, > The SimpleConsumer constructor requires a specific host and port. > > Can this be any broker? > If it needs to be a specific broker, for 0.8.2, should this be the offset > coordinator? For 0.8.1, does it matter? > -Suren > > > On Thursday, February 19, 2015 10:43 AM, Joel Koshy > wrote: > > > I see - yes, you can use the SimpleConsumer for that. However, your > high-level consumers need to be shutdown while you do that (otherwise > they may auto-commit while you are resetting offsets). > > Thanks, > > Joel > > On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > > We are using the High Level Consumer API to interact with Kafka for our > > normal use cases. > > > > However, on consumer restart in the case of consumer failures, we want to > > be able to manually > > reset offsets in certain situations. > > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) > > It looked like instantiating a SimpleConsumer just to reset offsets on > > restart was a viable option, while continuing to use the High Level > > Consumer for our normal operations. Not sure if there is a better way that > > is compatible across 0.8.1 and 0.8.2. > > -Suren > > > > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy > > wrote: > > > > > > Not sure what you mean by using the SimpleConsumer on failure > > recovery. Can you elaborate on this? > > > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > > We are using the High Level Consumer generally but are thinking to use > > > the SimpleConsumer on failure recovery to set the offsets. > > > Is that the recommended approach for this use case? > > > Thanks. > > > -Suren > > > > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy > > > wrote: > > > > > > > > > Are you using it from Java or Scala? i.e., are you using the > > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > > OffsetCommitRequest/OffsetFetchRequest which means it will > > > commit/fetch to/from ZooKeeper only. If you use the scala API you can > > > create an OffsetCommitRequest with version set to 1 (which will allow > > > you to commit to Kafka). > > > > > > Since we are doing an 0.8.2.1 release we will make the above more > > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > > even from the javaapi. I will be updating the documentation on this to > > > make it clearer. > > > > > > Thanks, > > > > > > Joel > > > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > > Joel, > > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > > > OffsetCommitRequest and sending that over to a broker. > > > > Is the broker storing that in ZK? > > > > -Suren > > > > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > > > wrote: > > > > > > > > > > > > Hi Chris, > > > > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > > > offsets that are stored in ZooKeeper. You don't need to issue any > > > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > > > does not support fetching offsets that are stored in Kafka. > > > > > > > > Thanks, > > > > > > > > Joel > > > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > > > Hi, > > > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > > > and > > > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > > > > what I think when restarting my consumers so that they can begin > > > > > working where they last left off (in the event that they were stopped > > > > > for a while then restarted some time later, and new messages had come > > > > > in). > > > > > > > > > > The fetchOffsets() works on time, usually it looks like you send it > > > > > Earliest or Latest (beginning or end of what's currently in the > > > > > stream). > > > > > > > > > > I realize the documentation says this: > > > > > > > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require > > > > > > a significant amount of work not needed in the Consumer Groups: > > > > > > > > > > > > 1. You must keep track of the offsets in your application to > > > > > >know where you left off consuming. > >
Re: Consuming a snapshot from log compacted topic
The log end offset (of a partition) changes when messages are appended to the partition. (It is not correlated with the consumer's offset). On Thu, Feb 19, 2015 at 08:58:10PM +, Will Funnell wrote: > So at what point does the log end offset change? When you commit? > > On 19 February 2015 at 18:47, Joel Koshy wrote: > > > > If I consumed up to the log end offset and log compaction happens in > > > between, I would have missed some messages. > > > > Compaction actually only runs on the rolled over segments (not the > > active - i.e., latest segment). The log-end-offset will be in the > > latest segment which does not participate in compaction. > > > > > > The log end offset is just the end of the committed messages in the log > > > > (the last thing the consumer has access to). It isn't the same as the > > > > cleaner point but is always later than it so it would work just as > > well. > > > > > > Isn't this just roughly the same value as using c.getOffsetsBefore() > > with a > > > partitionRequestTime of -1? > > > > > > > > > Although its always later than the cleaner point, surely log compaction > > is > > > still an issue here. > > > > > > If I consumed up to the log end offset and log compaction happens in > > > between, I would have missed some messages. > > > > > > > > > My thinking was that if you knew the log cleaner point, you could: > > > > > > Make a note of the starting offset > > > Consume till end of log > > > Check my starting point is ahead of current cleaner point, otherwise > > loop. > > > > > > > > > I appreciate there is a chance I misunderstood your point. > > > > > > On 19 February 2015 at 18:02, Jay Kreps wrote: > > > > > > > The log end offset is just the end of the committed messages in the log > > > > (the last thing the consumer has access to). It isn't the same as the > > > > cleaner point but is always later than it so it would work just as > > well. > > > > > > > > -Jay > > > > > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell > > > > wrote: > > > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > > > along the lines of: we expose the log-end-offset (actually the high > > > > > > watermark) of the partition in the fetch response. However, this is > > > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > > were to consume a record you can check that it has offsets up to > > the > > > > > > log-end offset. If it does then you would know for sure that you > > have > > > > > > consumed everything for that partition > > > > > > > > > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > > > > > > > > > > > > > > > > > On 19 February 2015 at 03:10, Jay Kreps wrote: > > > > > > > > > > > Yeah I was thinking either along the lines Joel was suggesting or > > else > > > > > > adding a logEndOffset(TopicPartition) method or something like > > that. As > > > > > > Joel says the consumer actually has this information internally (we > > > > > return > > > > > > it with the fetch request) but doesn't expose it. > > > > > > > > > > > > -Jay > > > > > > > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy > > > > wrote: > > > > > > > > > > > > > > > 2. Make the log end offset available more easily in the > > consumer. > > > > > > > > > > > > > > > > Was thinking something would need to be added in > > LogCleanerManager, > > > > > in > > > > > > > the > > > > > > > > updateCheckpoints function. Where would be best to publish the > > > > > > > information > > > > > > > > to make it more easily available, or would you just expose the > > > > > > > > offset-cleaner-checkpoint file as it is? > > > > > > > > Is it right you would also need to know which > > > > > offset-cleaner-checkpoint > > > > > > > > entry related to each active partition? > > > > > > > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it > > is > > > > > > > along the lines of: we expose the log-end-offset (actually the > > high > > > > > > > watermark) of the partition in the fetch response. However, this > > is > > > > > > > not exposed to the consumer (either in the new ConsumerRecord > > class > > > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > > > were to consume a record you can check that it has offsets up to > > the > > > > > > > log-end offset. If it does then you would know for sure that you > > have > > > > > > > consumed everything for that partition. > > > > > > > > > > > > > > > Yes, was looking at this initially, but as we have 100-150 > > writes > > > > per > > > > > > > > second, it could be a while before there is a pause long > > enough to > > > > > > check > > > > > > > it > > > > > > > > has caught up. Even with the consumer timeout set to -1, it > > takes > > > > > some > > > > > > > time > > > > > > > > to query the max offset values, which is still lo
Re: New Consumer Offset management in 0.8.2
Yes it is supported in 0.8.2-beta. It is documented on the site - you will need to set offsets.storage to kafka. On Thu, Feb 19, 2015 at 03:57:31PM -0500, Matthew Butt wrote: > I'm having a hard time figuring out if the new Kafka-based offset > management in the high-level Scala Consumer is implemented in the current > version of 0.8.2-beta. If I implement a high-level consumer, will it use > the new system, or will it still be storing in zookeeper? Do I need to wait > for the Java consumer to take advantage of it? > > -- > - Matt
Re: NetworkProcessorAvgIdlePercent
Jun, I am already using the latest release 0.8.2.1. -Zakee On Thu, Feb 19, 2015 at 2:46 PM, Jun Rao wrote: > Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue > and should reduce the CPU load in network thread. > > Thanks, > > Jun > > On Thu, Feb 19, 2015 at 11:54 AM, Zakee wrote: > > > Kafka documentation recommends <0.3 for above metric. I assume processor > is > > busier if this goes below 0.3 and obviously it being < 0.3 for long does > > not seem to be a good sign. > > > > What should be our criteria to raise an alert, I though it should be > when > > its value goes below 0.3. However, the value seems to be below 0.3 a lot > of > > the times, almost always if we take samples every five mins. What should > be > > the threshold to raise an alarm ? > > > > What would be the impact of having this below 0.3 or even zero like most > of > > the times? > > > > > > -Zakee > > > > How Old Men Tighten Skin > > 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home > > http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc > > 8% Annuity Return Secret > Earn Guaranteed Income for Life! Compare Rates Today. > http://thirdpartyoffers.netzero.net/TGL3255/54e6782bcbe78782b37bdmp15duc
Re: data corruption like behavior
[2015-02-05 14:21:09,708] ERROR [ReplicaFetcherThread-2-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 147301; ClientId: ReplicaFetcherThread-2-1; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [site.db.people,6] -> PartitionFetchInfo(0,1048576),[site.db.main,4] -> PartitionFetchInfo(0,1048576),[site.db.school,7] -> PartitionFetchInfo(0,1048576),[site.db.people,2] -> PartitionFetchInfo(0,1048576),[k3.hydra,6] -> PartitionFetchInfo(3,1048576),[site.db.school,3] -> PartitionFetchInfo(0,1048576),[site.db.main,0] -> PartitionFetchInfo(0,1048576),[site.db.cmphotos,2] -> PartitionFetchInfo(2245,1048576),[site.db.cmphotos,6] -> PartitionFetchInfo(2220,1048576) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused These were some of the errors from the server log. didnt find any on the producer side of things. On Thu, Feb 19, 2015 at 4:30 PM, Jun Rao wrote: > Is there any error in the producer log? Is there any pattern in the > messages being lost? > > Thanks, > > Jun > > On Thu, Feb 19, 2015 at 4:20 PM, Karts wrote: > > > yes i did. > > > > On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao wrote: > > > > > Did you consume the messages from the beginning of the log? > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Feb 19, 2015 at 12:18 PM, Karts wrote: > > > > > > > but they have always been up. I mean when i was testing, all the > > > zookeepers > > > > were up. and all the kafka nodes were up. its just that I changed the > > > > number of zookeeper nodes in my first test iteration. second and > third > > > were > > > > still the same. not sure why the topics were losing some messages. > > > > > > > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: > > > > > > > > > Zookeeper requires a majority of the nodes to be up for the service > > to > > > be > > > > > available. Kafka relies on Zookeeper to be always available. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts > wrote: > > > > > > > > > > > I have noticed some strange patterns when testing with the 0.8.1 > > > build > > > > > and > > > > > > the 0.8.2 builds, and are listed below. > > > > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 > > zookeepers], > > > > > > created 2 topics via the API calls, everything went fine and was > > > > > > successfully able to view my messages in my consumers. There were > > no > > > > > > messages lost. All is happy. Now, I change my setup to just have > 1 > > > > > > zookeeper. and do my test again, i lose some messages. I have > > checked > > > > > that > > > > > > all my configs are pointing to just 1 zookeeper and there was no > > > > mention > > > > > of > > > > > > the other 2 offline zookeepers. any idea why ? > > > > > > 2. I revert back my settings to the original config, all 3 nodes > > are > > > > > > online, no errors, send messages to same old topic, and i am > still > > > > > loosing > > > > > > some messages. I deleted all the old topic files [to follow the > > > > 'cleanup' > > > > > > process], create a new topic, and i am successfully able to > receive > > > all > > > > > > messages. no loss whatsoever. > > > > > > 3. Now in this state, i upgrade to 0.8.2, and try sending > messages > > to > > > > the > > > > > > topic that was made after the above cleanup, and i am losing > > messages > > > > > > again. > > > > > > > > > > > > Am i making sense? I mean this is a very strange behavior, and if > > > > anyone > > > > > > can comment on this [please correct me if i have done something > > > 'very' > > > > > > wrong].. > > > > > > > > > > > > Thanks.. > > > > > > > > > > > > > > > > > > > > >
Re: data corruption like behavior
Is there any error in the producer log? Is there any pattern in the messages being lost? Thanks, Jun On Thu, Feb 19, 2015 at 4:20 PM, Karts wrote: > yes i did. > > On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao wrote: > > > Did you consume the messages from the beginning of the log? > > > > Thanks, > > > > Jun > > > > On Thu, Feb 19, 2015 at 12:18 PM, Karts wrote: > > > > > but they have always been up. I mean when i was testing, all the > > zookeepers > > > were up. and all the kafka nodes were up. its just that I changed the > > > number of zookeeper nodes in my first test iteration. second and third > > were > > > still the same. not sure why the topics were losing some messages. > > > > > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: > > > > > > > Zookeeper requires a majority of the nodes to be up for the service > to > > be > > > > available. Kafka relies on Zookeeper to be always available. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: > > > > > > > > > I have noticed some strange patterns when testing with the 0.8.1 > > build > > > > and > > > > > the 0.8.2 builds, and are listed below. > > > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 > zookeepers], > > > > > created 2 topics via the API calls, everything went fine and was > > > > > successfully able to view my messages in my consumers. There were > no > > > > > messages lost. All is happy. Now, I change my setup to just have 1 > > > > > zookeeper. and do my test again, i lose some messages. I have > checked > > > > that > > > > > all my configs are pointing to just 1 zookeeper and there was no > > > mention > > > > of > > > > > the other 2 offline zookeepers. any idea why ? > > > > > 2. I revert back my settings to the original config, all 3 nodes > are > > > > > online, no errors, send messages to same old topic, and i am still > > > > loosing > > > > > some messages. I deleted all the old topic files [to follow the > > > 'cleanup' > > > > > process], create a new topic, and i am successfully able to receive > > all > > > > > messages. no loss whatsoever. > > > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages > to > > > the > > > > > topic that was made after the above cleanup, and i am losing > messages > > > > > again. > > > > > > > > > > Am i making sense? I mean this is a very strange behavior, and if > > > anyone > > > > > can comment on this [please correct me if i have done something > > 'very' > > > > > wrong].. > > > > > > > > > > Thanks.. > > > > > > > > > > > > > > >
Re: data corruption like behavior
actually i take that back. it reads from where the last offset left off. On Thu, Feb 19, 2015 at 4:20 PM, Karts wrote: > yes i did. > > On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao wrote: > >> Did you consume the messages from the beginning of the log? >> >> Thanks, >> >> Jun >> >> On Thu, Feb 19, 2015 at 12:18 PM, Karts wrote: >> >> > but they have always been up. I mean when i was testing, all the >> zookeepers >> > were up. and all the kafka nodes were up. its just that I changed the >> > number of zookeeper nodes in my first test iteration. second and third >> were >> > still the same. not sure why the topics were losing some messages. >> > >> > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: >> > >> > > Zookeeper requires a majority of the nodes to be up for the service >> to be >> > > available. Kafka relies on Zookeeper to be always available. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: >> > > >> > > > I have noticed some strange patterns when testing with the 0.8.1 >> build >> > > and >> > > > the 0.8.2 builds, and are listed below. >> > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], >> > > > created 2 topics via the API calls, everything went fine and was >> > > > successfully able to view my messages in my consumers. There were no >> > > > messages lost. All is happy. Now, I change my setup to just have 1 >> > > > zookeeper. and do my test again, i lose some messages. I have >> checked >> > > that >> > > > all my configs are pointing to just 1 zookeeper and there was no >> > mention >> > > of >> > > > the other 2 offline zookeepers. any idea why ? >> > > > 2. I revert back my settings to the original config, all 3 nodes are >> > > > online, no errors, send messages to same old topic, and i am still >> > > loosing >> > > > some messages. I deleted all the old topic files [to follow the >> > 'cleanup' >> > > > process], create a new topic, and i am successfully able to receive >> all >> > > > messages. no loss whatsoever. >> > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages >> to >> > the >> > > > topic that was made after the above cleanup, and i am losing >> messages >> > > > again. >> > > > >> > > > Am i making sense? I mean this is a very strange behavior, and if >> > anyone >> > > > can comment on this [please correct me if i have done something >> 'very' >> > > > wrong].. >> > > > >> > > > Thanks.. >> > > > >> > > >> > >> > >
Re: data corruption like behavior
yes i did. On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao wrote: > Did you consume the messages from the beginning of the log? > > Thanks, > > Jun > > On Thu, Feb 19, 2015 at 12:18 PM, Karts wrote: > > > but they have always been up. I mean when i was testing, all the > zookeepers > > were up. and all the kafka nodes were up. its just that I changed the > > number of zookeeper nodes in my first test iteration. second and third > were > > still the same. not sure why the topics were losing some messages. > > > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: > > > > > Zookeeper requires a majority of the nodes to be up for the service to > be > > > available. Kafka relies on Zookeeper to be always available. > > > > > > Thanks, > > > > > > Jun > > > > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: > > > > > > > I have noticed some strange patterns when testing with the 0.8.1 > build > > > and > > > > the 0.8.2 builds, and are listed below. > > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], > > > > created 2 topics via the API calls, everything went fine and was > > > > successfully able to view my messages in my consumers. There were no > > > > messages lost. All is happy. Now, I change my setup to just have 1 > > > > zookeeper. and do my test again, i lose some messages. I have checked > > > that > > > > all my configs are pointing to just 1 zookeeper and there was no > > mention > > > of > > > > the other 2 offline zookeepers. any idea why ? > > > > 2. I revert back my settings to the original config, all 3 nodes are > > > > online, no errors, send messages to same old topic, and i am still > > > loosing > > > > some messages. I deleted all the old topic files [to follow the > > 'cleanup' > > > > process], create a new topic, and i am successfully able to receive > all > > > > messages. no loss whatsoever. > > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to > > the > > > > topic that was made after the above cleanup, and i am losing messages > > > > again. > > > > > > > > Am i making sense? I mean this is a very strange behavior, and if > > anyone > > > > can comment on this [please correct me if i have done something > 'very' > > > > wrong].. > > > > > > > > Thanks.. > > > > > > > > > >
Re: KafkaProducer.send contract
Hey Jun, That's what I've got right now, semaphore before send() and release in the callback. Am I correct in understanding that there's no way to do any batching with KafkaProducer itself (other than have a "bulk" message which would just be a single message with multiple messages for a particular Node)? JAmes On Thu, Feb 19, 2015 at 2:50 PM, Jun Rao wrote: > You can register a callback for each message sent. The callback will be > called when the message is sent successfully or failed. > > Thanks, > > Jun > > On Tue, Feb 17, 2015 at 4:11 PM, JAmes Atwill > wrote: > > > Hi! > > > > I'm using the new KafkaProducer in 0.8.2.0. > > > > I have thousands of "Nodes" which receive messages. Each message > > idempotently mutates the state of the Node, so while duplicate messages > are > > fine, missed messages are not. > > > > I'm writing these messages into a topic with dozens of partitions. > > > > Am I correct in believing that I'll have to manually manage having one > > message "in flight" per "node" at a time? Or is there a mechanism to say > > "This message and all messages after it for this partition were > rejected"? > > (or something similar) > > > > Thanks! > > > > JAmes > > >
Re: KafkaProducer.send contract
You can register a callback for each message sent. The callback will be called when the message is sent successfully or failed. Thanks, Jun On Tue, Feb 17, 2015 at 4:11 PM, JAmes Atwill wrote: > Hi! > > I'm using the new KafkaProducer in 0.8.2.0. > > I have thousands of "Nodes" which receive messages. Each message > idempotently mutates the state of the Node, so while duplicate messages are > fine, missed messages are not. > > I'm writing these messages into a topic with dozens of partitions. > > Am I correct in believing that I'll have to manually manage having one > message "in flight" per "node" at a time? Or is there a mechanism to say > "This message and all messages after it for this partition were rejected"? > (or something similar) > > Thanks! > > JAmes >
Re: data corruption like behavior
Did you consume the messages from the beginning of the log? Thanks, Jun On Thu, Feb 19, 2015 at 12:18 PM, Karts wrote: > but they have always been up. I mean when i was testing, all the zookeepers > were up. and all the kafka nodes were up. its just that I changed the > number of zookeeper nodes in my first test iteration. second and third were > still the same. not sure why the topics were losing some messages. > > On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: > > > Zookeeper requires a majority of the nodes to be up for the service to be > > available. Kafka relies on Zookeeper to be always available. > > > > Thanks, > > > > Jun > > > > On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: > > > > > I have noticed some strange patterns when testing with the 0.8.1 build > > and > > > the 0.8.2 builds, and are listed below. > > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], > > > created 2 topics via the API calls, everything went fine and was > > > successfully able to view my messages in my consumers. There were no > > > messages lost. All is happy. Now, I change my setup to just have 1 > > > zookeeper. and do my test again, i lose some messages. I have checked > > that > > > all my configs are pointing to just 1 zookeeper and there was no > mention > > of > > > the other 2 offline zookeepers. any idea why ? > > > 2. I revert back my settings to the original config, all 3 nodes are > > > online, no errors, send messages to same old topic, and i am still > > loosing > > > some messages. I deleted all the old topic files [to follow the > 'cleanup' > > > process], create a new topic, and i am successfully able to receive all > > > messages. no loss whatsoever. > > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to > the > > > topic that was made after the above cleanup, and i am losing messages > > > again. > > > > > > Am i making sense? I mean this is a very strange behavior, and if > anyone > > > can comment on this [please correct me if i have done something 'very' > > > wrong].. > > > > > > Thanks.. > > > > > >
Re: NetworkProcessorAvgIdlePercent
Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue and should reduce the CPU load in network thread. Thanks, Jun On Thu, Feb 19, 2015 at 11:54 AM, Zakee wrote: > Kafka documentation recommends <0.3 for above metric. I assume processor is > busier if this goes below 0.3 and obviously it being < 0.3 for long does > not seem to be a good sign. > > What should be our criteria to raise an alert, I though it should be when > its value goes below 0.3. However, the value seems to be below 0.3 a lot of > the times, almost always if we take samples every five mins. What should be > the threshold to raise an alarm ? > > What would be the impact of having this below 0.3 or even zero like most of > the times? > > > -Zakee > > How Old Men Tighten Skin > 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home > http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc
Re: big cpu jump on producer in face of broker outage
Jun, You are right. I tried 0.8.2.0 producer with my test. confirmed that it fixed the cpu issue. Thanks, Steven On Thu, Feb 19, 2015 at 12:02 PM, Steven Wu wrote: > will try 0.8.2.1 on producer and report back result. > > On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao wrote: > >> This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you >> try that version or 0.8.2.1 which is being voted now. >> >> Thanks, >> >> Jun >> >> On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu wrote: >> >> > forgot to mention in case it matters >> > producer: 0.8.2-beta >> > broker: 0.8.1.1 >> > >> > On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu >> wrote: >> > >> > > I think this is an issue caused by KAFKA-1788. >> > > >> > > I was trying to test producer resiliency to broker outage. In this >> > > experiment, I shutdown all brokers and see how producer behavior. >> > > >> > > Here are the observations >> > > 1) kafka producer can recover from kafka outage. i.e. send resumed >> after >> > > brokers came back >> > > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one >> > > test. >> > > >> > > Note that I didn't observe cpu issue when new producer instance >> started >> > > with brokers outage. In this case, there are no messages accumulated >> in >> > the >> > > buffer, because KafkaProducer constructor failed with DNS lookup for >> > > route53 name. when brokers came up, my wrapper re-created >> KafkaProducer >> > > object and recover from outage with sending messages. >> > > >> > > Here is the cpu graph for a running producer instance where broker >> outage >> > > happened in the middle of test run. it shows cpu problem. >> > > >> > > >> > >> https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing >> > > >> > > Here is the cpu graph for a new producer instance where broker outage >> > > happened before instance startup. cpu is good here. >> > > >> > > >> > >> https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing >> > > >> > > Note that producer is a 4-core m1.xlarge instance. x-axis is time, >> y-axis >> > > is cpu util. >> > > >> > > Thanks, >> > > Steven >> > > >> > >> > >
New Consumer Offset management in 0.8.2
I'm having a hard time figuring out if the new Kafka-based offset management in the high-level Scala Consumer is implemented in the current version of 0.8.2-beta. If I implement a high-level consumer, will it use the new system, or will it still be storing in zookeeper? Do I need to wait for the Java consumer to take advantage of it? -- - Matt
Re: Consuming a snapshot from log compacted topic
So at what point does the log end offset change? When you commit? On 19 February 2015 at 18:47, Joel Koshy wrote: > > If I consumed up to the log end offset and log compaction happens in > > between, I would have missed some messages. > > Compaction actually only runs on the rolled over segments (not the > active - i.e., latest segment). The log-end-offset will be in the > latest segment which does not participate in compaction. > > > > The log end offset is just the end of the committed messages in the log > > > (the last thing the consumer has access to). It isn't the same as the > > > cleaner point but is always later than it so it would work just as > well. > > > > Isn't this just roughly the same value as using c.getOffsetsBefore() > with a > > partitionRequestTime of -1? > > > > > > Although its always later than the cleaner point, surely log compaction > is > > still an issue here. > > > > If I consumed up to the log end offset and log compaction happens in > > between, I would have missed some messages. > > > > > > My thinking was that if you knew the log cleaner point, you could: > > > > Make a note of the starting offset > > Consume till end of log > > Check my starting point is ahead of current cleaner point, otherwise > loop. > > > > > > I appreciate there is a chance I misunderstood your point. > > > > On 19 February 2015 at 18:02, Jay Kreps wrote: > > > > > The log end offset is just the end of the committed messages in the log > > > (the last thing the consumer has access to). It isn't the same as the > > > cleaner point but is always later than it so it would work just as > well. > > > > > > -Jay > > > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell > > > wrote: > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > > along the lines of: we expose the log-end-offset (actually the high > > > > > watermark) of the partition in the fetch response. However, this is > > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > were to consume a record you can check that it has offsets up to > the > > > > > log-end offset. If it does then you would know for sure that you > have > > > > > consumed everything for that partition > > > > > > > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > > > > > > > > > > > > > On 19 February 2015 at 03:10, Jay Kreps wrote: > > > > > > > > > Yeah I was thinking either along the lines Joel was suggesting or > else > > > > > adding a logEndOffset(TopicPartition) method or something like > that. As > > > > > Joel says the consumer actually has this information internally (we > > > > return > > > > > it with the fetch request) but doesn't expose it. > > > > > > > > > > -Jay > > > > > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy > > > wrote: > > > > > > > > > > > > > 2. Make the log end offset available more easily in the > consumer. > > > > > > > > > > > > > > Was thinking something would need to be added in > LogCleanerManager, > > > > in > > > > > > the > > > > > > > updateCheckpoints function. Where would be best to publish the > > > > > > information > > > > > > > to make it more easily available, or would you just expose the > > > > > > > offset-cleaner-checkpoint file as it is? > > > > > > > Is it right you would also need to know which > > > > offset-cleaner-checkpoint > > > > > > > entry related to each active partition? > > > > > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it > is > > > > > > along the lines of: we expose the log-end-offset (actually the > high > > > > > > watermark) of the partition in the fetch response. However, this > is > > > > > > not exposed to the consumer (either in the new ConsumerRecord > class > > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > > were to consume a record you can check that it has offsets up to > the > > > > > > log-end offset. If it does then you would know for sure that you > have > > > > > > consumed everything for that partition. > > > > > > > > > > > > > Yes, was looking at this initially, but as we have 100-150 > writes > > > per > > > > > > > second, it could be a while before there is a pause long > enough to > > > > > check > > > > > > it > > > > > > > has caught up. Even with the consumer timeout set to -1, it > takes > > > > some > > > > > > time > > > > > > > to query the max offset values, which is still long enough for > more > > > > > > > messages to arrive. > > > > > > > > > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy > > > > wrote: > > > > > > > > > > > > > > > > You are also correct and perceptive to notice that if you > check > > > > the > > > > > > end > > > > > > > > of > > > > > > > > > the log then begin consuming and read up to that po
Re: data corruption like behavior
but they have always been up. I mean when i was testing, all the zookeepers were up. and all the kafka nodes were up. its just that I changed the number of zookeeper nodes in my first test iteration. second and third were still the same. not sure why the topics were losing some messages. On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao wrote: > Zookeeper requires a majority of the nodes to be up for the service to be > available. Kafka relies on Zookeeper to be always available. > > Thanks, > > Jun > > On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: > > > I have noticed some strange patterns when testing with the 0.8.1 build > and > > the 0.8.2 builds, and are listed below. > > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], > > created 2 topics via the API calls, everything went fine and was > > successfully able to view my messages in my consumers. There were no > > messages lost. All is happy. Now, I change my setup to just have 1 > > zookeeper. and do my test again, i lose some messages. I have checked > that > > all my configs are pointing to just 1 zookeeper and there was no mention > of > > the other 2 offline zookeepers. any idea why ? > > 2. I revert back my settings to the original config, all 3 nodes are > > online, no errors, send messages to same old topic, and i am still > loosing > > some messages. I deleted all the old topic files [to follow the 'cleanup' > > process], create a new topic, and i am successfully able to receive all > > messages. no loss whatsoever. > > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the > > topic that was made after the above cleanup, and i am losing messages > > again. > > > > Am i making sense? I mean this is a very strange behavior, and if anyone > > can comment on this [please correct me if i have done something 'very' > > wrong].. > > > > Thanks.. > > >
Re: Default MirrorMaker not copying over from source to target
Tao, I updated the mirrorconsumer.properties config file as you suggested, and upped the MM's log level to DEBUG. I have the output of the DEBUG logger here in this pastebin, if you could take a minute to look for anything in its contents that would indicate a problem that would be extremely helpful. Note that my servers hostnames are of the form ad-010X or ba-0X where X is some integer between 1 and 4. http://pastebin.com/rBsxx15A When I run the mirrormaker and then spin up a console consumer to read from the source cluster, I get 0 messages consumed. Alex On Sun, Feb 15, 2015 at 3:00 AM, tao xiao wrote: > Alex, > > Are you sure you have data continually being sent to the topic in source > cluster after you bring up MM? By default auto.offset.reset=largest in MM > consumer config which means MM only fetches the largest offset if the > consumer group has no initial offset in zookeeper. > > You can have MM print more log by changing the log level in > config/tools-log4j.properties > > On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville > wrote: > > > Hi Kafka'ers, > > > > > > I am trying to get the Mirrormaker working with two separate clusters, > one > > as the source and the other as the target. The topic I'm trying to copy > > over exists on both the source and target clusters. Here are the relevant > > entries in my consumer and producer properties files, which I'm > specifying > > the command I run to start the MM: > > > > *mirrorconsumer.properties:* > > zookeeper.connect=ad-0104:2181 > > zookeeper.connection.timeout.ms=6000 > > group.id=test-consumer-group > > > > > > *mirrorproducer.properties:* > > metadata.broker.list=ba-02:9092,ba-03:9092 > > producer.type=sync > > compression.codec=none > > serializer.class=kafka.serializer.DefaultEncoder > > > > > > Then I run the following command: > > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config > > ../config/mirrorconsumer.properties --producer.config > > ../config/mirrorproducer.properties --whitelist consolemm > > > > so consolemm is the topic I'm trying to copy over. I've created consolemm > > and have used to console-consumer to verify that there are messages in > the > > topic. > > > > When I run this command... nothing happens. The process keeps running and > > prints nothing to the Terminal. If I look in the output of the zookeeper > on > > the source cluster I get only the following: > > > > [2015-02-15 00:34:06,102] INFO Accepted socket connection from / > > 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory) > > [2015-02-15 00:34:06,104] INFO Client attempting to establish new session > > at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) > > [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 with > > negotiated timeout 6000 for client /10.7.162.75:42819 > > (org.apache.zookeeper.server.ZooKeeperServer) > > > > > > and when I look at the output of one of the brokers on the source > cluster I > > get: > > > > [2015-02-15 00:32:14,382] INFO Closing socket connection to /10.7.162.75 > . > > (kafka.network.Processor) > > > > and there is no output on the zookeeper on the target cluster. > > > > > > > > Any advice on what is causing MM to not properly copy over data to the > > target cluster would be extremely helpful. > > > > -Alex > > > > > > -- > Regards, > Tao >
Re: What conditions can cause Leader: -1 ?
Any error in the controller and state-change log? Thanks, Jun On Thu, Feb 12, 2015 at 7:28 AM, Omid Aladini wrote: > Hi, > > I'm experimenting with the following scenario: > > - 3 brokers are running (0,1 and 2) -- Kafka version 0.8.2.0 > - Continuously: restart broker number 0 by triggering controlled shutdown. > Sleep rand(10) seconds. repeat. > - Continuously: create 'simple-test-topic' (RF=2), write and read messages, > then delete the topic. repeat. > > After a while, broker 0 doesn't come back up any more due to "corrupt > index" error (but that's not my question for the moment). Looking at the > state of the topics: > > Topic:simple-test-topicPartitionCount:8ReplicationFactor:2 > Configs: > Topic: simple-test-topicPartition: 0Leader: -1Replicas: 1,2 >Isr: 1 > Topic: simple-test-topicPartition: 1Leader: -1Replicas: 2,0 >Isr: 2 > Topic: simple-test-topicPartition: 2Leader: -1Replicas: 0,1 >Isr: 1 > Topic: simple-test-topicPartition: 3Leader: -1Replicas: 1,0 >Isr: 1 > Topic: simple-test-topicPartition: 4Leader: -1Replicas: 2,1 >Isr: 1 > Topic: simple-test-topicPartition: 5Leader: -1Replicas: 0,2 >Isr: 2 > Topic: simple-test-topicPartition: 6Leader: -1Replicas: 1,2 >Isr: 1 > Topic: simple-test-topicPartition: 7Leader: -1Replicas: 2,0 >Isr: 2 > Topic:testPartitionCount:8ReplicationFactor:3Configs: > Topic: testPartition: 0Leader: 1Replicas: 1,2,0Isr: 2,1 > Topic: testPartition: 1Leader: 2Replicas: 2,0,1Isr: 2,1 > Topic: testPartition: 2Leader: 2Replicas: 0,1,2Isr: 2,1 > Topic: testPartition: 3Leader: 1Replicas: 1,0,2Isr: 2,1 > Topic: testPartition: 4Leader: 2Replicas: 2,1,0Isr: 2,1 > Topic: testPartition: 5Leader: 2Replicas: 0,2,1Isr: 2,1 > Topic: testPartition: 6Leader: 1Replicas: 1,2,0Isr: 2,1 > Topic: testPartition: 7Leader: 2Replicas: 2,0,1Isr: 2,1 > > .. at which point: > > - All 'simple-test-topic' partitions are leaderless. > - It's not possible to delete "simple-test-topic" any more. > - Calling 'kafka-preferred-replica-election.sh' successfully starts > election but doesn't have any effect. > > The other topic, named "test" (RF 3), is just sitting there and not > actively participating in the test. > > Now I'm wondering: > > - Which of the steps above could have caused "simple-test-topic" partitions > to become leaderless? > - How to recover in such situation in cases where broker 0 can or cannot be > recovered? > > Thanks, > Omid >
Re: big cpu jump on producer in face of broker outage
will try 0.8.2.1 on producer and report back result. On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao wrote: > This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you > try that version or 0.8.2.1 which is being voted now. > > Thanks, > > Jun > > On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu wrote: > > > forgot to mention in case it matters > > producer: 0.8.2-beta > > broker: 0.8.1.1 > > > > On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu > wrote: > > > > > I think this is an issue caused by KAFKA-1788. > > > > > > I was trying to test producer resiliency to broker outage. In this > > > experiment, I shutdown all brokers and see how producer behavior. > > > > > > Here are the observations > > > 1) kafka producer can recover from kafka outage. i.e. send resumed > after > > > brokers came back > > > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one > > > test. > > > > > > Note that I didn't observe cpu issue when new producer instance started > > > with brokers outage. In this case, there are no messages accumulated in > > the > > > buffer, because KafkaProducer constructor failed with DNS lookup for > > > route53 name. when brokers came up, my wrapper re-created KafkaProducer > > > object and recover from outage with sending messages. > > > > > > Here is the cpu graph for a running producer instance where broker > outage > > > happened in the middle of test run. it shows cpu problem. > > > > > > > > > https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing > > > > > > Here is the cpu graph for a new producer instance where broker outage > > > happened before instance startup. cpu is good here. > > > > > > > > > https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing > > > > > > Note that producer is a 4-core m1.xlarge instance. x-axis is time, > y-axis > > > is cpu util. > > > > > > Thanks, > > > Steven > > > > > >
NetworkProcessorAvgIdlePercent
Kafka documentation recommends <0.3 for above metric. I assume processor is busier if this goes below 0.3 and obviously it being < 0.3 for long does not seem to be a good sign. What should be our criteria to raise an alert, I though it should be when its value goes below 0.3. However, the value seems to be below 0.3 a lot of the times, almost always if we take samples every five mins. What should be the threshold to raise an alarm ? What would be the impact of having this below 0.3 or even zero like most of the times? -Zakee How Old Men Tighten Skin 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc
Re: Simple Consumer and offsets
Joel/All, The SimpleConsumer constructor requires a specific host and port. Can this be any broker? If it needs to be a specific broker, for 0.8.2, should this be the offset coordinator? For 0.8.1, does it matter? -Suren On Thursday, February 19, 2015 10:43 AM, Joel Koshy wrote: I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > We are using the High Level Consumer API to interact with Kafka for our > normal use cases. > > However, on consumer restart in the case of consumer failures, we want to be > able to manually > reset offsets in certain situations. > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) > It looked like instantiating a SimpleConsumer just to reset offsets on > restart was a viable option, while continuing to use the High Level Consumer > for our normal operations. Not sure if there is a better way that is > compatible across 0.8.1 and 0.8.2. > -Suren > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy > wrote: > > > Not sure what you mean by using the SimpleConsumer on failure > recovery. Can you elaborate on this? > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > We are using the High Level Consumer generally but are thinking to use the > > SimpleConsumer on failure recovery to set the offsets. > > Is that the recommended approach for this use case? > > Thanks. > > -Suren > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy > > wrote: > > > > > > Are you using it from Java or Scala? i.e., are you using the > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > OffsetCommitRequest/OffsetFetchRequest which means it will > > commit/fetch to/from ZooKeeper only. If you use the scala API you can > > create an OffsetCommitRequest with version set to 1 (which will allow > > you to commit to Kafka). > > > > Since we are doing an 0.8.2.1 release we will make the above more > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > even from the javaapi. I will be updating the documentation on this to > > make it clearer. > > > > Thanks, > > > > Joel > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > Joel, > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > > OffsetCommitRequest and sending that over to a broker. > > > Is the broker storing that in ZK? > > > -Suren > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > > wrote: > > > > > > > > > Hi Chris, > > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > > offsets that are stored in ZooKeeper. You don't need to issue any > > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > > does not support fetching offsets that are stored in Kafka. > > > > > > Thanks, > > > > > > Joel > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > > Hi, > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > and > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > > > what I think when restarting my consumers so that they can begin > > > > working where they last left off (in the event that they were stopped > > > > for a while then restarted some time later, and new messages had come > > > > in). > > > > > > > > The fetchOffsets() works on time, usually it looks like you send it > > > > Earliest or Latest (beginning or end of what's currently in the > > > > stream). > > > > > > > > I realize the documentation says this: > > > > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > > > significant amount of work not needed in the Consumer Groups: > > > > > > > > > > 1. You must keep track of the offsets in your application to know > > > > >where you left off consuming. > > > > > > > > > > But that's not really quite true ... not as long as commitOffsets() > > > > > has been provided. It seems the SimpleConsumer provides you with a > > > > > solution to only one half of the problem of offset management. > > > > > > > > Using some zookeeper python scripts I wrote I can see that the > > > > commitOffsets() is doing its job and writing to > > > > > > > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > > > > > > > That has this value: > > > > > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=
Re: big cpu jump on producer in face of broker outage
This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you try that version or 0.8.2.1 which is being voted now. Thanks, Jun On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu wrote: > forgot to mention in case it matters > producer: 0.8.2-beta > broker: 0.8.1.1 > > On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu wrote: > > > I think this is an issue caused by KAFKA-1788. > > > > I was trying to test producer resiliency to broker outage. In this > > experiment, I shutdown all brokers and see how producer behavior. > > > > Here are the observations > > 1) kafka producer can recover from kafka outage. i.e. send resumed after > > brokers came back > > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one > > test. > > > > Note that I didn't observe cpu issue when new producer instance started > > with brokers outage. In this case, there are no messages accumulated in > the > > buffer, because KafkaProducer constructor failed with DNS lookup for > > route53 name. when brokers came up, my wrapper re-created KafkaProducer > > object and recover from outage with sending messages. > > > > Here is the cpu graph for a running producer instance where broker outage > > happened in the middle of test run. it shows cpu problem. > > > > > https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing > > > > Here is the cpu graph for a new producer instance where broker outage > > happened before instance startup. cpu is good here. > > > > > https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing > > > > Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis > > is cpu util. > > > > Thanks, > > Steven > > >
Re: data corruption like behavior
Zookeeper requires a majority of the nodes to be up for the service to be available. Kafka relies on Zookeeper to be always available. Thanks, Jun On Thu, Feb 19, 2015 at 11:15 AM, Karts wrote: > I have noticed some strange patterns when testing with the 0.8.1 build and > the 0.8.2 builds, and are listed below. > 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], > created 2 topics via the API calls, everything went fine and was > successfully able to view my messages in my consumers. There were no > messages lost. All is happy. Now, I change my setup to just have 1 > zookeeper. and do my test again, i lose some messages. I have checked that > all my configs are pointing to just 1 zookeeper and there was no mention of > the other 2 offline zookeepers. any idea why ? > 2. I revert back my settings to the original config, all 3 nodes are > online, no errors, send messages to same old topic, and i am still loosing > some messages. I deleted all the old topic files [to follow the 'cleanup' > process], create a new topic, and i am successfully able to receive all > messages. no loss whatsoever. > 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the > topic that was made after the above cleanup, and i am losing messages > again. > > Am i making sense? I mean this is a very strange behavior, and if anyone > can comment on this [please correct me if i have done something 'very' > wrong].. > > Thanks.. >
Re: [VOTE] 0.8.2.1 Candidate 1
+1 binding. Checked the md5, and quick start. Some minor comments: 1. The quickstart section would better include the building step after download and before starting server. 2. There seems to be a bug in Gradle 1.1x with Java 8 causing the "gradle" initialization to fail: - FAILURE: Build failed with an exception. * Where: Build file '/home/guwang/Workspace/temp/kafka/build.gradle' line: 199 * What went wrong: A problem occurred evaluating root project 'kafka'. > Could not create task of type 'ScalaDoc'. -- Downgrading Java to 1.7 resolve this issue. Guozhang On Wed, Feb 18, 2015 at 7:56 PM, Connie Yang wrote: > +1 > On Feb 18, 2015 7:23 PM, "Matt Narrell" wrote: > > > +1 > > > > > On Feb 18, 2015, at 7:56 PM, Jun Rao wrote: > > > > > > This is the first candidate for release of Apache Kafka 0.8.2.1. This > > > only fixes one critical issue (KAFKA-1952) in 0.8.2.0. > > > > > > Release Notes for the 0.8.2.1 release > > > > > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html > > > > > > *** Please download, test and vote by Saturday, Feb 21, 7pm PT > > > > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > > http://kafka.apache.org/KEYS in addition to the md5, sha1 > > > and sha2 (SHA256) checksum. > > > > > > * Release artifacts to be voted upon (source and binary): > > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/ > > > > > > * Maven artifacts to be voted upon prior to release: > > > https://repository.apache.org/content/groups/staging/ > > > > > > * scala-doc > > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/ > > > > > > * java-doc > > > https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/ > > > > > > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag > > > > > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6 > > > > > > /*** > > > > > > Thanks, > > > > > > Jun > > > > > -- -- Guozhang
data corruption like behavior
I have noticed some strange patterns when testing with the 0.8.1 build and the 0.8.2 builds, and are listed below. 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], created 2 topics via the API calls, everything went fine and was successfully able to view my messages in my consumers. There were no messages lost. All is happy. Now, I change my setup to just have 1 zookeeper. and do my test again, i lose some messages. I have checked that all my configs are pointing to just 1 zookeeper and there was no mention of the other 2 offline zookeepers. any idea why ? 2. I revert back my settings to the original config, all 3 nodes are online, no errors, send messages to same old topic, and i am still loosing some messages. I deleted all the old topic files [to follow the 'cleanup' process], create a new topic, and i am successfully able to receive all messages. no loss whatsoever. 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the topic that was made after the above cleanup, and i am losing messages again. Am i making sense? I mean this is a very strange behavior, and if anyone can comment on this [please correct me if i have done something 'very' wrong].. Thanks..
Re: Broker w/ high memory due to index file sizes
Well are there any measurement techniques for Memory config in brokers. We do have a large load, with a max throughput 200MB/s. What do you suggest as the recommended memory config for 5 brokers to handle such loads? On Wed, Feb 18, 2015 at 7:13 PM, Jay Kreps wrote: > 40G is really huge, generally you would want more like 4G. Are you sure you > need that? Not sure what you mean by lsof and index files being too large, > but the index files are memory mapped so they should be able to grow > arbitrarily large and their memory usage is not counted in the java heap > (in fact by having such a large heap you are taking away OS memory from > them). > > -Jay > > On Wed, Feb 18, 2015 at 4:13 PM, Zakee wrote: > > > I am running a cluster of 5 brokers with 40G ms/mx for each. I found one > of > > the brokers is constantly using above ~90% of memory for jvm.heapUsage. I > > checked from lsof output that the size of the index files for this broker > > is too large. > > > > Not sure what is going on with this one broker in the cluster? Why would > > the index file sizes be so hugely different on one broker? Any ideas? > > > > > > Regards > > Zakee > > > > Invest with the Trend > > Exclusive Breakout Alert On Soaring Social Media Technology > > http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc > > Have you been injured? > Get a free evaluation today to see what your injury case is worth. > http://thirdpartyoffers.netzero.net/TGL3255/54e55ad9894265ad90bcbmp13duc
Re: Consuming a snapshot from log compacted topic
> If I consumed up to the log end offset and log compaction happens in > between, I would have missed some messages. Compaction actually only runs on the rolled over segments (not the active - i.e., latest segment). The log-end-offset will be in the latest segment which does not participate in compaction. > > The log end offset is just the end of the committed messages in the log > > (the last thing the consumer has access to). It isn't the same as the > > cleaner point but is always later than it so it would work just as well. > > Isn't this just roughly the same value as using c.getOffsetsBefore() with a > partitionRequestTime of -1? > > > Although its always later than the cleaner point, surely log compaction is > still an issue here. > > If I consumed up to the log end offset and log compaction happens in > between, I would have missed some messages. > > > My thinking was that if you knew the log cleaner point, you could: > > Make a note of the starting offset > Consume till end of log > Check my starting point is ahead of current cleaner point, otherwise loop. > > > I appreciate there is a chance I misunderstood your point. > > On 19 February 2015 at 18:02, Jay Kreps wrote: > > > The log end offset is just the end of the committed messages in the log > > (the last thing the consumer has access to). It isn't the same as the > > cleaner point but is always later than it so it would work just as well. > > > > -Jay > > > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell > > wrote: > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > along the lines of: we expose the log-end-offset (actually the high > > > > watermark) of the partition in the fetch response. However, this is > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > were to consume a record you can check that it has offsets up to the > > > > log-end offset. If it does then you would know for sure that you have > > > > consumed everything for that partition > > > > > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > > > > > > > > > On 19 February 2015 at 03:10, Jay Kreps wrote: > > > > > > > Yeah I was thinking either along the lines Joel was suggesting or else > > > > adding a logEndOffset(TopicPartition) method or something like that. As > > > > Joel says the consumer actually has this information internally (we > > > return > > > > it with the fetch request) but doesn't expose it. > > > > > > > > -Jay > > > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy > > wrote: > > > > > > > > > > > 2. Make the log end offset available more easily in the consumer. > > > > > > > > > > > > Was thinking something would need to be added in LogCleanerManager, > > > in > > > > > the > > > > > > updateCheckpoints function. Where would be best to publish the > > > > > information > > > > > > to make it more easily available, or would you just expose the > > > > > > offset-cleaner-checkpoint file as it is? > > > > > > Is it right you would also need to know which > > > offset-cleaner-checkpoint > > > > > > entry related to each active partition? > > > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > > along the lines of: we expose the log-end-offset (actually the high > > > > > watermark) of the partition in the fetch response. However, this is > > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > > were to consume a record you can check that it has offsets up to the > > > > > log-end offset. If it does then you would know for sure that you have > > > > > consumed everything for that partition. > > > > > > > > > > > Yes, was looking at this initially, but as we have 100-150 writes > > per > > > > > > second, it could be a while before there is a pause long enough to > > > > check > > > > > it > > > > > > has caught up. Even with the consumer timeout set to -1, it takes > > > some > > > > > time > > > > > > to query the max offset values, which is still long enough for more > > > > > > messages to arrive. > > > > > > > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy > > > wrote: > > > > > > > > > > > > > > You are also correct and perceptive to notice that if you check > > > the > > > > > end > > > > > > > of > > > > > > > > the log then begin consuming and read up to that point > > compaction > > > > may > > > > > > > have > > > > > > > > already kicked in (if the reading takes a while) and hence you > > > > might > > > > > have > > > > > > > > an incomplete snapshot. > > > > > > > > > > > > > > Isn't it sufficient to just repeat the check at the end after > > > reading > > > > > > > the log and repeat until you are truly done? At least for th
Re: big cpu jump on producer in face of broker outage
forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu wrote: > I think this is an issue caused by KAFKA-1788. > > I was trying to test producer resiliency to broker outage. In this > experiment, I shutdown all brokers and see how producer behavior. > > Here are the observations > 1) kafka producer can recover from kafka outage. i.e. send resumed after > brokers came back > 2) producer instance saw big cpu jump during outage. 28% -> 52% in one > test. > > Note that I didn't observe cpu issue when new producer instance started > with brokers outage. In this case, there are no messages accumulated in the > buffer, because KafkaProducer constructor failed with DNS lookup for > route53 name. when brokers came up, my wrapper re-created KafkaProducer > object and recover from outage with sending messages. > > Here is the cpu graph for a running producer instance where broker outage > happened in the middle of test run. it shows cpu problem. > > https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing > > Here is the cpu graph for a new producer instance where broker outage > happened before instance startup. cpu is good here. > > https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing > > Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis > is cpu util. > > Thanks, > Steven >
big cpu jump on producer in face of broker outage
I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% -> 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: Consuming a snapshot from log compacted topic
> The log end offset is just the end of the committed messages in the log > (the last thing the consumer has access to). It isn't the same as the > cleaner point but is always later than it so it would work just as well. Isn't this just roughly the same value as using c.getOffsetsBefore() with a partitionRequestTime of -1? Although its always later than the cleaner point, surely log compaction is still an issue here. If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. My thinking was that if you knew the log cleaner point, you could: Make a note of the starting offset Consume till end of log Check my starting point is ahead of current cleaner point, otherwise loop. I appreciate there is a chance I misunderstood your point. On 19 February 2015 at 18:02, Jay Kreps wrote: > The log end offset is just the end of the committed messages in the log > (the last thing the consumer has access to). It isn't the same as the > cleaner point but is always later than it so it would work just as well. > > -Jay > > On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell > wrote: > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > along the lines of: we expose the log-end-offset (actually the high > > > watermark) of the partition in the fetch response. However, this is > > > not exposed to the consumer (either in the new ConsumerRecord class > > > or the existing MessageAndMetadata class). If we did, then if you > > > were to consume a record you can check that it has offsets up to the > > > log-end offset. If it does then you would know for sure that you have > > > consumed everything for that partition > > > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > > > > > On 19 February 2015 at 03:10, Jay Kreps wrote: > > > > > Yeah I was thinking either along the lines Joel was suggesting or else > > > adding a logEndOffset(TopicPartition) method or something like that. As > > > Joel says the consumer actually has this information internally (we > > return > > > it with the fetch request) but doesn't expose it. > > > > > > -Jay > > > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy > wrote: > > > > > > > > > 2. Make the log end offset available more easily in the consumer. > > > > > > > > > > Was thinking something would need to be added in LogCleanerManager, > > in > > > > the > > > > > updateCheckpoints function. Where would be best to publish the > > > > information > > > > > to make it more easily available, or would you just expose the > > > > > offset-cleaner-checkpoint file as it is? > > > > > Is it right you would also need to know which > > offset-cleaner-checkpoint > > > > > entry related to each active partition? > > > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > > along the lines of: we expose the log-end-offset (actually the high > > > > watermark) of the partition in the fetch response. However, this is > > > > not exposed to the consumer (either in the new ConsumerRecord class > > > > or the existing MessageAndMetadata class). If we did, then if you > > > > were to consume a record you can check that it has offsets up to the > > > > log-end offset. If it does then you would know for sure that you have > > > > consumed everything for that partition. > > > > > > > > > Yes, was looking at this initially, but as we have 100-150 writes > per > > > > > second, it could be a while before there is a pause long enough to > > > check > > > > it > > > > > has caught up. Even with the consumer timeout set to -1, it takes > > some > > > > time > > > > > to query the max offset values, which is still long enough for more > > > > > messages to arrive. > > > > > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy > > wrote: > > > > > > > > > > > > You are also correct and perceptive to notice that if you check > > the > > > > end > > > > > > of > > > > > > > the log then begin consuming and read up to that point > compaction > > > may > > > > > > have > > > > > > > already kicked in (if the reading takes a while) and hence you > > > might > > > > have > > > > > > > an incomplete snapshot. > > > > > > > > > > > > Isn't it sufficient to just repeat the check at the end after > > reading > > > > > > the log and repeat until you are truly done? At least for the > > > purposes > > > > > > of a snapshot? > > > > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: > > > > > > > If you catch up off a compacted topic and keep consuming then > you > > > > will > > > > > > > become consistent with the log. > > > > > > > > > > > > > > I think what you are saying is that you want to create a > snapshot > > > > from > > > > > > the > > > > > > > Kafka topic but NOT do continual reads after that point. For > > > example > > > > you > > > > > > > might be creating a backup of the dat
Broker ID disappears in Zookeeper
Hello, We're having the following issue with Kafka and/or Zookeeper: If a broker (id=1) is running, and you start another broker with id=1, the new broker will exit saying "A broker is already registered on the path /brokers/ids/1". However, I noticed when I query zookeeper /brokers/ids/1 disappears This behaviour doesn't make sense to us. The concern is that if we accidentally start up multiple brokers with the same ID (automatic restarts), then we may end up with multiple brokers with the same ID running at the same time. Thoughts? Kafka: 0.8.2 Zookeeper: 3.4.5
Re: Consuming a snapshot from log compacted topic
The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. -Jay On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell wrote: > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > along the lines of: we expose the log-end-offset (actually the high > > watermark) of the partition in the fetch response. However, this is > > not exposed to the consumer (either in the new ConsumerRecord class > > or the existing MessageAndMetadata class). If we did, then if you > > were to consume a record you can check that it has offsets up to the > > log-end offset. If it does then you would know for sure that you have > > consumed everything for that partition > > To confirm then, the log-end-offset is the same as the cleaner point? > > > > On 19 February 2015 at 03:10, Jay Kreps wrote: > > > Yeah I was thinking either along the lines Joel was suggesting or else > > adding a logEndOffset(TopicPartition) method or something like that. As > > Joel says the consumer actually has this information internally (we > return > > it with the fetch request) but doesn't expose it. > > > > -Jay > > > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy wrote: > > > > > > > 2. Make the log end offset available more easily in the consumer. > > > > > > > > Was thinking something would need to be added in LogCleanerManager, > in > > > the > > > > updateCheckpoints function. Where would be best to publish the > > > information > > > > to make it more easily available, or would you just expose the > > > > offset-cleaner-checkpoint file as it is? > > > > Is it right you would also need to know which > offset-cleaner-checkpoint > > > > entry related to each active partition? > > > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > > along the lines of: we expose the log-end-offset (actually the high > > > watermark) of the partition in the fetch response. However, this is > > > not exposed to the consumer (either in the new ConsumerRecord class > > > or the existing MessageAndMetadata class). If we did, then if you > > > were to consume a record you can check that it has offsets up to the > > > log-end offset. If it does then you would know for sure that you have > > > consumed everything for that partition. > > > > > > > Yes, was looking at this initially, but as we have 100-150 writes per > > > > second, it could be a while before there is a pause long enough to > > check > > > it > > > > has caught up. Even with the consumer timeout set to -1, it takes > some > > > time > > > > to query the max offset values, which is still long enough for more > > > > messages to arrive. > > > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy > wrote: > > > > > > > > > > You are also correct and perceptive to notice that if you check > the > > > end > > > > > of > > > > > > the log then begin consuming and read up to that point compaction > > may > > > > > have > > > > > > already kicked in (if the reading takes a while) and hence you > > might > > > have > > > > > > an incomplete snapshot. > > > > > > > > > > Isn't it sufficient to just repeat the check at the end after > reading > > > > > the log and repeat until you are truly done? At least for the > > purposes > > > > > of a snapshot? > > > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: > > > > > > If you catch up off a compacted topic and keep consuming then you > > > will > > > > > > become consistent with the log. > > > > > > > > > > > > I think what you are saying is that you want to create a snapshot > > > from > > > > > the > > > > > > Kafka topic but NOT do continual reads after that point. For > > example > > > you > > > > > > might be creating a backup of the data to a file. > > > > > > > > > > > > I agree that this isn't as easy as it could be. As you say the > only > > > > > > solution we have is that timeout which doesn't differentiate > > between > > > GC > > > > > > stall in your process and no more messages left so you would need > > to > > > tune > > > > > > the timeout. This is admittedly kind of a hack. > > > > > > > > > > > > You are also correct and perceptive to notice that if you check > the > > > end > > > > > of > > > > > > the log then begin consuming and read up to that point compaction > > may > > > > > have > > > > > > already kicked in (if the reading takes a while) and hence you > > might > > > have > > > > > > an incomplete snapshot. > > > > > > > > > > > > I think there are two features we could add that would make this > > > easier: > > > > > > 1. Make the cleaner point configurable on a per-topic basis. This > > > feature > > > > > > would allow you to control how long the full log is retained and > > when > > > > > > compaction can kick in. This would give a configurable SLA for > the
Re: Consuming a snapshot from log compacted topic
> I'm not sure if I misunderstood Jay's suggestion, but I think it is > along the lines of: we expose the log-end-offset (actually the high > watermark) of the partition in the fetch response. However, this is > not exposed to the consumer (either in the new ConsumerRecord class > or the existing MessageAndMetadata class). If we did, then if you > were to consume a record you can check that it has offsets up to the > log-end offset. If it does then you would know for sure that you have > consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps wrote: > Yeah I was thinking either along the lines Joel was suggesting or else > adding a logEndOffset(TopicPartition) method or something like that. As > Joel says the consumer actually has this information internally (we return > it with the fetch request) but doesn't expose it. > > -Jay > > On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy wrote: > > > > > 2. Make the log end offset available more easily in the consumer. > > > > > > Was thinking something would need to be added in LogCleanerManager, in > > the > > > updateCheckpoints function. Where would be best to publish the > > information > > > to make it more easily available, or would you just expose the > > > offset-cleaner-checkpoint file as it is? > > > Is it right you would also need to know which offset-cleaner-checkpoint > > > entry related to each active partition? > > > > I'm not sure if I misunderstood Jay's suggestion, but I think it is > > along the lines of: we expose the log-end-offset (actually the high > > watermark) of the partition in the fetch response. However, this is > > not exposed to the consumer (either in the new ConsumerRecord class > > or the existing MessageAndMetadata class). If we did, then if you > > were to consume a record you can check that it has offsets up to the > > log-end offset. If it does then you would know for sure that you have > > consumed everything for that partition. > > > > > Yes, was looking at this initially, but as we have 100-150 writes per > > > second, it could be a while before there is a pause long enough to > check > > it > > > has caught up. Even with the consumer timeout set to -1, it takes some > > time > > > to query the max offset values, which is still long enough for more > > > messages to arrive. > > > > Got it - thanks for clarifying. > > > > > > > > > > > > > > On 18 February 2015 at 23:16, Joel Koshy wrote: > > > > > > > > You are also correct and perceptive to notice that if you check the > > end > > > > of > > > > > the log then begin consuming and read up to that point compaction > may > > > > have > > > > > already kicked in (if the reading takes a while) and hence you > might > > have > > > > > an incomplete snapshot. > > > > > > > > Isn't it sufficient to just repeat the check at the end after reading > > > > the log and repeat until you are truly done? At least for the > purposes > > > > of a snapshot? > > > > > > > > On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: > > > > > If you catch up off a compacted topic and keep consuming then you > > will > > > > > become consistent with the log. > > > > > > > > > > I think what you are saying is that you want to create a snapshot > > from > > > > the > > > > > Kafka topic but NOT do continual reads after that point. For > example > > you > > > > > might be creating a backup of the data to a file. > > > > > > > > > > I agree that this isn't as easy as it could be. As you say the only > > > > > solution we have is that timeout which doesn't differentiate > between > > GC > > > > > stall in your process and no more messages left so you would need > to > > tune > > > > > the timeout. This is admittedly kind of a hack. > > > > > > > > > > You are also correct and perceptive to notice that if you check the > > end > > > > of > > > > > the log then begin consuming and read up to that point compaction > may > > > > have > > > > > already kicked in (if the reading takes a while) and hence you > might > > have > > > > > an incomplete snapshot. > > > > > > > > > > I think there are two features we could add that would make this > > easier: > > > > > 1. Make the cleaner point configurable on a per-topic basis. This > > feature > > > > > would allow you to control how long the full log is retained and > when > > > > > compaction can kick in. This would give a configurable SLA for the > > reader > > > > > process to catch up. > > > > > 2. Make the log end offset available more easily in the consumer. > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell < > > w.f.funn...@gmail.com> > > > > > wrote: > > > > > > > > > > > We are currently using Kafka 0.8.1.1 with log compaction in order > > to > > > > > > provide streams of messages to our clients. > > > > > > > > > > > > As well as constantly consuming the stream, one of our use cases > > is
Re: Simple Consumer and offsets
I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: > We are using the High Level Consumer API to interact with Kafka for our > normal use cases. > > However, on consumer restart in the case of consumer failures, we want to be > able to manually > reset offsets in certain situations. > And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) > It looked like instantiating a SimpleConsumer just to reset offsets on > restart was a viable option, while continuing to use the High Level Consumer > for our normal operations. Not sure if there is a better way that is > compatible across 0.8.1 and 0.8.2. > -Suren > > > On Thursday, February 19, 2015 10:25 AM, Joel Koshy > wrote: > > > Not sure what you mean by using the SimpleConsumer on failure > recovery. Can you elaborate on this? > > On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > > Haven't used either one now. Sounds like 0.8.2.1 will help. > > We are using the High Level Consumer generally but are thinking to use the > > SimpleConsumer on failure recovery to set the offsets. > > Is that the recommended approach for this use case? > > Thanks. > > -Suren > > > > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy > > wrote: > > > > > > Are you using it from Java or Scala? i.e., are you using the > > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > > > In 0.8.2 javaapi we explicitly set version 0 of the > > OffsetCommitRequest/OffsetFetchRequest which means it will > > commit/fetch to/from ZooKeeper only. If you use the scala API you can > > create an OffsetCommitRequest with version set to 1 (which will allow > > you to commit to Kafka). > > > > Since we are doing an 0.8.2.1 release we will make the above more > > consistent. i.e., you can create OffsetCommitRequests with version 1 > > even from the javaapi. I will be updating the documentation on this to > > make it clearer. > > > > Thanks, > > > > Joel > > > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > > Joel, > > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > > OffsetCommitRequest and sending that over to a broker. > > > Is the broker storing that in ZK? > > > -Suren > > > > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > > wrote: > > > > > > > > > Hi Chris, > > > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > > offsets that are stored in ZooKeeper. You don't need to issue any > > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > > does not support fetching offsets that are stored in Kafka. > > > > > > Thanks, > > > > > > Joel > > > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > > Hi, > > > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > > > and > > > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > > > what I think when restarting my consumers so that they can begin > > > > working where they last left off (in the event that they were stopped > > > > for a while then restarted some time later, and new messages had come > > > > in). > > > > > > > > The fetchOffsets() works on time, usually it looks like you send it > > > > Earliest or Latest (beginning or end of what's currently in the > > > > stream). > > > > > > > > I realize the documentation says this: > > > > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > > > significant amount of work not needed in the Consumer Groups: > > > > > > > > > > 1. You must keep track of the offsets in your application to know > > > > >where you left off consuming. > > > > > > > > > > But that's not really quite true ... not as long as commitOffsets() > > > > > has been provided. It seems the SimpleConsumer provides you with a > > > > > solution to only one half of the problem of offset management. > > > > > > > > Using some zookeeper python scripts I wrote I can see that the > > > > commitOffsets() is doing its job and writing to > > > > > > > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > > > > > > > That has this value: > > > > > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, > > > > ctime=1423777630972, > > > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > > > > > > > Now the question is just how to retrieve that - do I really have to > > > > have my c
Re: Simple Consumer and offsets
We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > Haven't used either one now. Sounds like 0.8.2.1 will help. > We are using the High Level Consumer generally but are thinking to use the > SimpleConsumer on failure recovery to set the offsets. > Is that the recommended approach for this use case? > Thanks. > -Suren > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy >wrote: > > > Are you using it from Java or Scala? i.e., are you using the > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > In 0.8.2 javaapi we explicitly set version 0 of the > OffsetCommitRequest/OffsetFetchRequest which means it will > commit/fetch to/from ZooKeeper only. If you use the scala API you can > create an OffsetCommitRequest with version set to 1 (which will allow > you to commit to Kafka). > > Since we are doing an 0.8.2.1 release we will make the above more > consistent. i.e., you can create OffsetCommitRequests with version 1 > even from the javaapi. I will be updating the documentation on this to > make it clearer. > > Thanks, > > Joel > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > Joel, > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > OffsetCommitRequest and sending that over to a broker. > > Is the broker storing that in ZK? > > -Suren > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > wrote: > > > > > > Hi Chris, > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > offsets that are stored in ZooKeeper. You don't need to issue any > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > does not support fetching offsets that are stored in Kafka. > > > > Thanks, > > > > Joel > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > Hi, > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > and > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > > what I think when restarting my consumers so that they can begin > > > working where they last left off (in the event that they were stopped > > > for a while then restarted some time later, and new messages had come > > > in). > > > > > > The fetchOffsets() works on time, usually it looks like you send it > > > Earliest or Latest (beginning or end of what's currently in the > > > stream). > > > > > > I realize the documentation says this: > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > > significant amount of work not needed in the Consumer Groups: > > > > > > > > 1. You must keep track of the offsets in your application to know > > > >where you left off consuming. > > > > > > > > But that's not really quite true ... not as long as commitOffsets() has > > > > been provided. It seems the SimpleConsumer provides you with a > > > > solution to only one half of the problem of offset management. > > > > > > Using some zookeeper python scripts I wrote I can see that the > > > commitOffsets() is doing its job and writing to > > > > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > > > > That has this value: > > > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, > > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > > > > Now the question is just how to retrieve that - do I really have to > > > have my client connect to ZK directly? If that's the case, future > > > upgrades would break (e.g. 0.8.2 having its own storage for commit > > > watermarks). > > > > > > > > > What was the intent here, and what's the advice on how to proceed > > > being that 0.8.2 is in an iffy state right now? > > > > > > > > > --Chris > > > > > > > > > > > >
Re: Simple Consumer and offsets
Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: > Haven't used either one now. Sounds like 0.8.2.1 will help. > We are using the High Level Consumer generally but are thinking to use the > SimpleConsumer on failure recovery to set the offsets. > Is that the recommended approach for this use case? > Thanks. > -Suren > > > On Thursday, February 19, 2015 9:40 AM, Joel Koshy > wrote: > > > Are you using it from Java or Scala? i.e., are you using the > javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer > > In 0.8.2 javaapi we explicitly set version 0 of the > OffsetCommitRequest/OffsetFetchRequest which means it will > commit/fetch to/from ZooKeeper only. If you use the scala API you can > create an OffsetCommitRequest with version set to 1 (which will allow > you to commit to Kafka). > > Since we are doing an 0.8.2.1 release we will make the above more > consistent. i.e., you can create OffsetCommitRequests with version 1 > even from the javaapi. I will be updating the documentation on this to > make it clearer. > > Thanks, > > Joel > > On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > > Joel, > > Looking at SimpleConsumer in the 0.8.2 code, it is using > > OffsetCommitRequest and sending that over to a broker. > > Is the broker storing that in ZK? > > -Suren > > > > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > > wrote: > > > > > > Hi Chris, > > > > In 0.8.2, the simple consumer Java API supports committing/fetching > > offsets that are stored in ZooKeeper. You don't need to issue any > > ConsumerMetadataRequest for this. Unfortunately, the API currently > > does not support fetching offsets that are stored in Kafka. > > > > Thanks, > > > > Joel > > > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > > Hi, > > > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > > > I'm confused about why the SimpleConsumer has: > > > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > > > and > > > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > > what I think when restarting my consumers so that they can begin > > > working where they last left off (in the event that they were stopped > > > for a while then restarted some time later, and new messages had come > > > in). > > > > > > The fetchOffsets() works on time, usually it looks like you send it > > > Earliest or Latest (beginning or end of what's currently in the > > > stream). > > > > > > I realize the documentation says this: > > > > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > > significant amount of work not needed in the Consumer Groups: > > > > > > > > 1. You must keep track of the offsets in your application to know > > > >where you left off consuming. > > > > > > > > But that's not really quite true ... not as long as commitOffsets() has > > > > been provided. It seems the SimpleConsumer provides you with a > > > > solution to only one half of the problem of offset management. > > > > > > Using some zookeeper python scripts I wrote I can see that the > > > commitOffsets() is doing its job and writing to > > > > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > > > > That has this value: > > > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, > > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > > > > Now the question is just how to retrieve that - do I really have to > > > have my client connect to ZK directly? If that's the case, future > > > upgrades would break (e.g. 0.8.2 having its own storage for commit > > > watermarks). > > > > > > > > > What was the intent here, and what's the advice on how to proceed > > > being that 0.8.2 is in an iffy state right now? > > > > > > > > > --Chris > > > > > > > > > > > >
Re: Simple Consumer and offsets
Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > Joel, > Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest > and sending that over to a broker. > Is the broker storing that in ZK? > -Suren > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy >wrote: > > > Hi Chris, > > In 0.8.2, the simple consumer Java API supports committing/fetching > offsets that are stored in ZooKeeper. You don't need to issue any > ConsumerMetadataRequest for this. Unfortunately, the API currently > does not support fetching offsets that are stored in Kafka. > > Thanks, > > Joel > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > Hi, > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > I'm confused about why the SimpleConsumer has: > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > and > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > what I think when restarting my consumers so that they can begin > > working where they last left off (in the event that they were stopped > > for a while then restarted some time later, and new messages had come > > in). > > > > The fetchOffsets() works on time, usually it looks like you send it > > Earliest or Latest (beginning or end of what's currently in the > > stream). > > > > I realize the documentation says this: > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > significant amount of work not needed in the Consumer Groups: > > > > > > 1. You must keep track of the offsets in your application to know > > >where you left off consuming. > > > > > > But that's not really quite true ... not as long as commitOffsets() has > > > been provided. It seems the SimpleConsumer provides you with a solution > > > to only one half of the problem of offset management. > > > > Using some zookeeper python scripts I wrote I can see that the > > commitOffsets() is doing its job and writing to > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > That has this value: > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > Now the question is just how to retrieve that - do I really have to > > have my client connect to ZK directly? If that's the case, future > > upgrades would break (e.g. 0.8.2 having its own storage for commit > > watermarks). > > > > > > What was the intent here, and what's the advice on how to proceed > > being that 0.8.2 is in an iffy state right now? > > > > > > --Chris > > > >
Re: Simple Consumer and offsets
Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: > Joel, > Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest > and sending that over to a broker. > Is the broker storing that in ZK? > -Suren > > > On Tuesday, February 17, 2015 12:22 PM, Joel Koshy > wrote: > > > Hi Chris, > > In 0.8.2, the simple consumer Java API supports committing/fetching > offsets that are stored in ZooKeeper. You don't need to issue any > ConsumerMetadataRequest for this. Unfortunately, the API currently > does not support fetching offsets that are stored in Kafka. > > Thanks, > > Joel > > On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > > Hi, > > > > I am still using 0.8.1.1 because of the CPU use concerns. > > > > I'm confused about why the SimpleConsumer has: > > > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > > > and > > > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > > > but no way that I can see to issue a ConsumerMetadataRequest, which is > > what I think when restarting my consumers so that they can begin > > working where they last left off (in the event that they were stopped > > for a while then restarted some time later, and new messages had come > > in). > > > > The fetchOffsets() works on time, usually it looks like you send it > > Earliest or Latest (beginning or end of what's currently in the > > stream). > > > > I realize the documentation says this: > > > > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > > significant amount of work not needed in the Consumer Groups: > > > > > > 1. You must keep track of the offsets in your application to know > > >where you left off consuming. > > > > > > But that's not really quite true ... not as long as commitOffsets() has > > > been provided. It seems the SimpleConsumer provides you with a solution > > > to only one half of the problem of offset management. > > > > Using some zookeeper python scripts I wrote I can see that the > > commitOffsets() is doing its job and writing to > > > > > > /consumers/myGroupId/offsets/myTopic/0 > > > > > > That has this value: > > > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, > > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > > > > Now the question is just how to retrieve that - do I really have to > > have my client connect to ZK directly? If that's the case, future > > upgrades would break (e.g. 0.8.2 having its own storage for commit > > watermarks). > > > > > > What was the intent here, and what's the advice on how to proceed > > being that 0.8.2 is in an iffy state right now? > > > > > > --Chris > > > >
Re: Simple Consumer and offsets
Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: > Hi, > > I am still using 0.8.1.1 because of the CPU use concerns. > > I'm confused about why the SimpleConsumer has: > > OffsetCommitResponse commitOffsets(OffsetCommitRequest request) > > and > > OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) > > but no way that I can see to issue a ConsumerMetadataRequest, which is > what I think when restarting my consumers so that they can begin > working where they last left off (in the event that they were stopped > for a while then restarted some time later, and new messages had come > in). > > The fetchOffsets() works on time, usually it looks like you send it > Earliest or Latest (beginning or end of what's currently in the > stream). > > I realize the documentation says this: > > > > *Downsides of using SimpleConsumer*The SimpleConsumer does require a > > significant amount of work not needed in the Consumer Groups: > > > > 1. You must keep track of the offsets in your application to know where > >you left off consuming. > > > > But that's not really quite true ... not as long as commitOffsets() has > > been provided. It seems the SimpleConsumer provides you with a solution to > > only one half of the problem of offset management. > > Using some zookeeper python scripts I wrote I can see that the > commitOffsets() is doing its job and writing to > > > /consumers/myGroupId/offsets/myTopic/0 > > > That has this value: > > ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, > > mtime=1424122117397, version=12568262, cversion=0, aversion=0, > > ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) > > > Now the question is just how to retrieve that - do I really have to > have my client connect to ZK directly? If that's the case, future > upgrades would break (e.g. 0.8.2 having its own storage for commit > watermarks). > > > What was the intent here, and what's the advice on how to proceed > being that 0.8.2 is in an iffy state right now? > > > --Chris
Re: Custom partitioner in kafka-0.8.2.0
thanks mani for quick response, sorry some how i missed this javadoc :) t SunilKalva On Thu, Feb 19, 2015 at 6:14 PM, Manikumar Reddy wrote: > Hi, > > In new producer, we can specify the partition number as part of > ProducerRecord. > > From javadocs : > *"If a valid partition number is specified that partition will be used when > sending the record. If no partition is specified but a key is present a > partition will be chosen using a hash of the key. If neither key nor > partition is present a partition will be assigned in a round-robin fashion. > "* > > > http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html > > > ManiKumar > > On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva > wrote: > > > Hi > > I could not find a way to customize "Partitioner" class in new > KafaProducer > > class, is it intentional ? > > > > tx > > SunilKalva > > >
Re: Custom partitioner in kafka-0.8.2.0
Hi, In new producer, we can specify the partition number as part of ProducerRecord. >From javadocs : *"If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion. "* http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html ManiKumar On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva wrote: > Hi > I could not find a way to customize "Partitioner" class in new KafaProducer > class, is it intentional ? > > tx > SunilKalva >
Re: Custom partitioner in kafka-0.8.2.0
> Hi > I could not find a way to customize "Partitioner" class in new > KafaProducer class, is it intentional ? > > tx > SunilKalva >
Custom partitioner in kafka-0.8.2.0
Hi I could not find a way to customize "Partitioner" class in new KafaProducer class, is it intentional ? tx SunilKalva