Re: kafka-python message offset?
I'm still not getting the necessary behavior. If I run on the command line, I get a series of messages: $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning Test test tests asdf If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end. See below: print Begin constructing SimpleConsumer client = KafkaClient(servers) s_consumer = SimpleConsumer(client, topic, group_id, partitions=[0], #Not sure I need this auto_commit=False, #Tried with and without, doesn't fix the problem auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem ) print End constructing SimpleConsumer\n print Begin reading messages try: for message in s_consumer: print New message print+ message.topic print+ message.partition print+ message.offset print+ message.key print+ message.value except Exception as e: print Exception: , e print End reading messages\n print End all Output: Begin all Begin constructing SimpleConsumer End constructing SimpleConsumer Begin reading messages It just hangs after that. I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing. I'm not sure what to do. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Dana Powers dana.pow...@rd.io Sent: Tuesday, July 28, 2015 09:58 PM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Hi Keith, you can use the `auto_offset_reset` parameter to kafka-python's KafkaConsumer. It behaves the same as the java consumer configuration of the same name. See http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure for more details on how to configure a KafkaConsumer instance. For fine-grained control wrt configuring topic/partition offsets, use KafkaConsumer.set_topic_partitions() . For the most control, pass a dictionary of {(topic, partition): offset, ...} . see http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions -Dana On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com wrote: I haven’t found a way to specify that a consumer should read from the beginning, or from any other explicit offset, or that the offset should be “reset” in any way. The command-line shell scripts (which I believe simply wrap the Scala tools) have flags for this sort of thing. Is there any way to do this through the python library? Thanks.
Re: kafka-python message offset?
Are you using mumrah/kafka-python? I think so from context but I know there's at least one other implementation rattling around these days. (-: If that's what you're using, I can see two potential problems you might be having. You can set the offset to some approximation of wherever you want, by using : s_consumer.seek(offset, whence) pydoc kafka.consumer says: | seek(self, offset, whence) | Alter the current offset in the consumer, similar to fseek | | offset: how much to modify the offset | whence: where to modify it from | 0 is relative to the earliest available offset (head) | 1 is relative to the current offset | 2 is relative to the latest known offset (tail) So if you want to go to the beginning, do seek(0, 0). Seeking to the beginning or the end should be pretty reliable. You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up. The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition. Hoping that the input stream is relatively well balanced. But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default. The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just partition #0, the producer might be publishing to a different partition. But you've told the client to read only from partition 0, so in that case you'd see no data. If you want to consume from every partition, don't pass in a partitions parameter. -Steve On Wed, Jul 29, 2015 at 04:07:33PM +, Keith Wiley wrote: I'm still not getting the necessary behavior. If I run on the command line, I get a series of messages: $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning Test test tests asdf If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end. See below: print Begin constructing SimpleConsumer client = KafkaClient(servers) s_consumer = SimpleConsumer(client, topic, group_id, partitions=[0], #Not sure I need this auto_commit=False, #Tried with and without, doesn't fix the problem auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem ) print End constructing SimpleConsumer\n print Begin reading messages try: for message in s_consumer: print New message print+ message.topic print+ message.partition print+ message.offset print+ message.key print+ message.value except Exception as e: print Exception: , e print End reading messages\n print End all Output: Begin all Begin constructing SimpleConsumer End constructing SimpleConsumer Begin reading messages It just hangs after that. I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing. I'm not sure what to do. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Dana Powers dana.pow...@rd.io Sent: Tuesday, July 28, 2015 09:58 PM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Hi Keith, you can use the `auto_offset_reset` parameter to kafka-python's KafkaConsumer. It behaves the same as the java consumer configuration of the same name. See http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure for more details on how to configure a KafkaConsumer instance. For fine-grained control wrt configuring topic/partition offsets, use KafkaConsumer.set_topic_partitions() . For the most control, pass a dictionary of {(topic, partition): offset, ...} . see http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions -Dana On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com wrote: I haven?t found a way to specify that a consumer should read from the beginning, or from any other explicit
Re: Connection to zk shell on Kafka
Sure. It would be great if you could as well explain the reason why the absence of the jar creates this problem Also, I'm surprised that zookeeper that comes bundled with kafka 0.8.2 does not have the jline jar Regards, prabcs On Wed, Jul 29, 2015 at 10:45 PM, Chris Barlock barl...@us.ibm.com wrote: You need the jline JAR file that ships with ZooKeeper. Chris IBM Tivoli Systems Research Triangle Park, NC (919) 224-2240 Internet: barl...@us.ibm.com From: Prabhjot Bharaj prabhbha...@gmail.com To: users@kafka.apache.org, u...@zookeeper.apache.org Date: 07/29/2015 01:13 PM Subject:Connection to zk shell on Kafka Hi folks, */kafka/bin# ./zookeeper-shell.sh localhost:2182/* *Connecting to localhost:2182/* *Welcome to ZooKeeper!* *JLine support is disabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *The shell never says connected* I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka broker has 1 zookeeper server running) When I try connecting to the shell, the shell never says 'Connected' However, if I try connecting on another standalone zookeeper which has no links to kafka, I'm able to connect:- */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181 http://127.0.0.1:2181* *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181* *Welcome to ZooKeeper!* *JLine support is enabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *[zk: 127.0.0.1:2181(CONNECTED) 0]* Am I missing something? Thanks, prabcs -- - There are only 10 types of people in the world: Those who understand binary, and those who don't
Re: kafka-python message offset?
Oh, I'm sorry. If I use the KafkaConsumer class instead of the SimpleConsumer class (as you suggested) it works. Frustratingly, SimpleConsumer will take the auto_offset_reset parameter without complaining that no such parameter exists, yet it doesn't work properly! But KafkaConsumer works, so I'm in better shape now. Thank you. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com
Re: kafka-python message offset?
Have you tried not setting a group_id in SimpleConsumer? If you have stored offsets in ZK for that group, and those offsets still exist on the server, the consumer will use them and not 'reset'. My hunch is that is your problem. You might also consider enabling kafka debug logs (though not for the faint-of-heart). Try initializing SimpleConsumer like so: ``` s_consumer = SimpleConsumer(client, topic, None, auto_offset_reset='smallest') ``` General thought: SimpleConsumer.seek() is a poorly constructed api. We have not deprecated it yet, but I recommend switching to KafkaConsumer instead. The same goes for SimpleConsumer as a whole, actually. It's api is quite old and is maintained mostly to avoid breaking legacy installations. Does KafkaConsumer.set_topic_partitions() work for your purposes? It will take absolute, but not relative, offsets. Combined w/ an auto_offset_reset policy, however, this should fulfill most use cases: start from head: auto_offset_reset='smallest' start from tail: auto_offset_reset='largest' start from ZK-stored offset w/ reset to head: group_id='foo', auto_offset_reset='smallest' start from ZK-stored w/ reset to tail: group_id='foo', auto_offset_reset='largest' start from offline-stored offset w/ reset to head: auto_offset_reset='smallest'; consumer.set_topic_partitions({('topic', 0): 1234}) start from offline-stored offset w/ reset to tail: auto_offset_reset='largest'; consumer.set_topic_partitions({('topic', 0): 1234}) If there's another use-case here that you think should be covered, please hop over to github.com/mumrah/kafka-python -Dana (kafka-python maintainer; KafkaConsumer author) On Wed, Jul 29, 2015 at 9:38 AM, Keith Wiley keith.wi...@atigeo.com wrote: Thanks. I got it to work if I use KafkaConsumer. I doesn't yet work with SimpleConsumer, and that includes seeking to 0,0. I'm curious what that isn't getting it going. It's frustrating because SimpleConsumer supports seek while KafkaConsumer doesn't offer a seek function, but at the same time, I can reset KafkaConsumer to the beginning with auto_offset_reset while SimpleConsumer has yet to work for me any way at all, including with a seek. So far, neither class is optimal for me (SimpleConsumer doesn't work at all yet and KafkaConsumer has no seek function). I'm using kafka-python 0.9.4 btw, just whatever version came up when pip installed it. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Steve Miller st...@idrathernotsay.com Sent: Wednesday, July 29, 2015 09:33 AM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Are you using mumrah/kafka-python? I think so from context but I know there's at least one other implementation rattling around these days. (-: If that's what you're using, I can see two potential problems you might be having. You can set the offset to some approximation of wherever you want, by using : s_consumer.seek(offset, whence) pydoc kafka.consumer says: | seek(self, offset, whence) | Alter the current offset in the consumer, similar to fseek | | offset: how much to modify the offset | whence: where to modify it from | 0 is relative to the earliest available offset (head) | 1 is relative to the current offset | 2 is relative to the latest known offset (tail) So if you want to go to the beginning, do seek(0, 0). Seeking to the beginning or the end should be pretty reliable. You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up. The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition. Hoping that the input stream is relatively well balanced. But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default. The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just partition #0, the producer might be publishing to a different partition. But you've told the client to read only from partition 0, so in that case you'd see no data. If you want to consume from every partition, don't pass in a partitions parameter. -Steve On Wed, Jul 29, 2015 at 04:07:33PM +, Keith Wiley wrote: I'm still not getting the necessary behavior. If I run on the command line, I get a series of messages: $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning Test test tests asdf If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end
Re: kafka-python message offset?
Thanks. I got it to work if I use KafkaConsumer. I doesn't yet work with SimpleConsumer, and that includes seeking to 0,0. I'm curious what that isn't getting it going. It's frustrating because SimpleConsumer supports seek while KafkaConsumer doesn't offer a seek function, but at the same time, I can reset KafkaConsumer to the beginning with auto_offset_reset while SimpleConsumer has yet to work for me any way at all, including with a seek. So far, neither class is optimal for me (SimpleConsumer doesn't work at all yet and KafkaConsumer has no seek function). I'm using kafka-python 0.9.4 btw, just whatever version came up when pip installed it. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Steve Miller st...@idrathernotsay.com Sent: Wednesday, July 29, 2015 09:33 AM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Are you using mumrah/kafka-python? I think so from context but I know there's at least one other implementation rattling around these days. (-: If that's what you're using, I can see two potential problems you might be having. You can set the offset to some approximation of wherever you want, by using : s_consumer.seek(offset, whence) pydoc kafka.consumer says: | seek(self, offset, whence) | Alter the current offset in the consumer, similar to fseek | | offset: how much to modify the offset | whence: where to modify it from | 0 is relative to the earliest available offset (head) | 1 is relative to the current offset | 2 is relative to the latest known offset (tail) So if you want to go to the beginning, do seek(0, 0). Seeking to the beginning or the end should be pretty reliable. You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up. The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition. Hoping that the input stream is relatively well balanced. But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default. The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just partition #0, the producer might be publishing to a different partition. But you've told the client to read only from partition 0, so in that case you'd see no data. If you want to consume from every partition, don't pass in a partitions parameter. -Steve On Wed, Jul 29, 2015 at 04:07:33PM +, Keith Wiley wrote: I'm still not getting the necessary behavior. If I run on the command line, I get a series of messages: $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning Test test tests asdf If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end. See below: print Begin constructing SimpleConsumer client = KafkaClient(servers) s_consumer = SimpleConsumer(client, topic, group_id, partitions=[0], #Not sure I need this auto_commit=False, #Tried with and without, doesn't fix the problem auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem ) print End constructing SimpleConsumer\n print Begin reading messages try: for message in s_consumer: print New message print+ message.topic print+ message.partition print+ message.offset print+ message.key print+ message.value except Exception as e: print Exception: , e print End reading messages\n print End all Output: Begin all Begin constructing SimpleConsumer End constructing SimpleConsumer Begin reading messages It just hangs after that. I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing. I'm not sure what to do. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Dana Powers dana.pow...@rd.io Sent: Tuesday, July 28, 2015 09:58 PM To: users@kafka.apache.org Subject: Re:
Connection to zk shell on Kafka
Hi folks, */kafka/bin# ./zookeeper-shell.sh localhost:2182/* *Connecting to localhost:2182/* *Welcome to ZooKeeper!* *JLine support is disabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *The shell never says connected* I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka broker has 1 zookeeper server running) When I try connecting to the shell, the shell never says 'Connected' However, if I try connecting on another standalone zookeeper which has no links to kafka, I'm able to connect:- */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181 http://127.0.0.1:2181* *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181* *Welcome to ZooKeeper!* *JLine support is enabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *[zk: 127.0.0.1:2181(CONNECTED) 0]* Am I missing something? Thanks, prabcs
Re: KAfka Mirror Maker
Mirror Maker does not have specific restrictions on cluster size. The error you saw was because consumer was not able to talk to the broker. Can you try to use kafka-console-consumer to consume some data from your source cluster and see if it works? It should be under KAFKA_HOME/bin/ Jiangjie (Becket) Qin On Tue, Jul 28, 2015 at 11:01 AM, Prabhjot Bharaj prabhbha...@gmail.com wrote: Hi, I'm using Mirror Maker with a cluster of 3 nodes and cluster of 5 nodes. I would like to ask - is the number of nodes a restriction for Mirror Maker? Also, are there any other restrictions or properties that should be common across both the clusters so that they continue mirroring. I'm asking this because I've got this error while mirroring:- [2015-07-28 17:51:10,943] WARN Fetching topic metadata with correlation id 0 for topics [Set(fromIndiaWithLove)] from broker [id:3,host:a10.2.3.4,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-07-28 17:51:18,955] WARN Fetching topic metadata with correlation id 0 for topics [Set(fromIndiaWithLove)] from broker [id:2,host:10.2.3.5,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-07-28 17:51:27,043] WARN Fetching topic metadata with correlation id 0 for topics [Set(fromIndiaWithLove)] from broker [id:5,host:a10.2.3.6port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) This is what my *consumer config* looks like:- *zookeeper.connect=10.2.3.4:2182 http://10.2.3.4:2182* *zookeeper.connection.timeout.ms http://zookeeper.connection.timeout.ms=100* *consumer.timeout.ms http://consumer.timeout.ms=-1* *group.id http://group.id=dp-mirrorMaker-test-datap1* *shallow.iterator.enable=true* *auto.create.topics.enable=true* I've used the default* producer.properties* in kafka/config/ which has these properteis:- *metadata.broker.list=localhost:9092* *producer.type=sync* *compression.codec=none* *serializer.class=kafka.serializer.DefaultEncoder* I'm running Mirror Maker via this command:- /kafka_2.10-0.8.2.0/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/sourceCluster1Consumer.config --num.streams 1 --producer.config producer.properties --whitelist=.* Regards, prabcs
Re: kafka-python message offset?
I got it. It has been tricky getting both consumer classes to work since they are not very similar. I configured one incorrectly because they take the arguments in different orders (in one topic comes before group and in the other that order is reversed). Now that it works, I can also see that the message classes they return are different, so the contents must be teased out in slightly different ways. It's basically working now. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Dana Powers dana.pow...@rd.io Sent: Wednesday, July 29, 2015 10:31 AM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Have you tried not setting a group_id in SimpleConsumer? If you have stored offsets in ZK for that group, and those offsets still exist on the server, the consumer will use them and not 'reset'. My hunch is that is your problem. You might also consider enabling kafka debug logs (though not for the faint-of-heart). Try initializing SimpleConsumer like so: ``` s_consumer = SimpleConsumer(client, topic, None, auto_offset_reset='smallest') ``` General thought: SimpleConsumer.seek() is a poorly constructed api. We have not deprecated it yet, but I recommend switching to KafkaConsumer instead. The same goes for SimpleConsumer as a whole, actually. It's api is quite old and is maintained mostly to avoid breaking legacy installations. Does KafkaConsumer.set_topic_partitions() work for your purposes? It will take absolute, but not relative, offsets. Combined w/ an auto_offset_reset policy, however, this should fulfill most use cases: start from head: auto_offset_reset='smallest' start from tail: auto_offset_reset='largest' start from ZK-stored offset w/ reset to head: group_id='foo', auto_offset_reset='smallest' start from ZK-stored w/ reset to tail: group_id='foo', auto_offset_reset='largest' start from offline-stored offset w/ reset to head: auto_offset_reset='smallest'; consumer.set_topic_partitions({('topic', 0): 1234}) start from offline-stored offset w/ reset to tail: auto_offset_reset='largest'; consumer.set_topic_partitions({('topic', 0): 1234}) If there's another use-case here that you think should be covered, please hop over to github.com/mumrah/kafka-python -Dana (kafka-python maintainer; KafkaConsumer author) On Wed, Jul 29, 2015 at 9:38 AM, Keith Wiley keith.wi...@atigeo.com wrote: Thanks. I got it to work if I use KafkaConsumer. I doesn't yet work with SimpleConsumer, and that includes seeking to 0,0. I'm curious what that isn't getting it going. It's frustrating because SimpleConsumer supports seek while KafkaConsumer doesn't offer a seek function, but at the same time, I can reset KafkaConsumer to the beginning with auto_offset_reset while SimpleConsumer has yet to work for me any way at all, including with a seek. So far, neither class is optimal for me (SimpleConsumer doesn't work at all yet and KafkaConsumer has no seek function). I'm using kafka-python 0.9.4 btw, just whatever version came up when pip installed it. Keith Wiley Senior Software Engineer, Atigeo keith.wi...@atigeo.com From: Steve Miller st...@idrathernotsay.com Sent: Wednesday, July 29, 2015 09:33 AM To: users@kafka.apache.org Subject: Re: kafka-python message offset? Are you using mumrah/kafka-python? I think so from context but I know there's at least one other implementation rattling around these days. (-: If that's what you're using, I can see two potential problems you might be having. You can set the offset to some approximation of wherever you want, by using : s_consumer.seek(offset, whence) pydoc kafka.consumer says: | seek(self, offset, whence) | Alter the current offset in the consumer, similar to fseek | | offset: how much to modify the offset | whence: where to modify it from | 0 is relative to the earliest available offset (head) | 1 is relative to the current offset | 2 is relative to the latest known offset (tail) So if you want to go to the beginning, do seek(0, 0). Seeking to the beginning or the end should be pretty reliable. You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up. The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition. Hoping that the input stream is relatively well balanced. But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default. The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just
Re: Connection to zk shell on Kafka
You need the jline JAR file that ships with ZooKeeper. Chris IBM Tivoli Systems Research Triangle Park, NC (919) 224-2240 Internet: barl...@us.ibm.com From: Prabhjot Bharaj prabhbha...@gmail.com To: users@kafka.apache.org, u...@zookeeper.apache.org Date: 07/29/2015 01:13 PM Subject:Connection to zk shell on Kafka Hi folks, */kafka/bin# ./zookeeper-shell.sh localhost:2182/* *Connecting to localhost:2182/* *Welcome to ZooKeeper!* *JLine support is disabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *The shell never says connected* I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka broker has 1 zookeeper server running) When I try connecting to the shell, the shell never says 'Connected' However, if I try connecting on another standalone zookeeper which has no links to kafka, I'm able to connect:- */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181 http://127.0.0.1:2181* *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181* *Welcome to ZooKeeper!* *JLine support is enabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *[zk: 127.0.0.1:2181(CONNECTED) 0]* Am I missing something? Thanks, prabcs
Re: Connection to zk shell on Kafka
I'm a user of Kafka/ZooKeeper not one of its developers, so I can't give you a technical explanation. I do agree that Kafka should ship the jline JAR if its zookeeper-shell depends on it. Chris From: Prabhjot Bharaj prabhbha...@gmail.com To: u...@zookeeper.apache.org, d...@kafka.apache.org Cc: users@kafka.apache.org Date: 07/29/2015 01:27 PM Subject:Re: Connection to zk shell on Kafka Sure. It would be great if you could as well explain the reason why the absence of the jar creates this problem Also, I'm surprised that zookeeper that comes bundled with kafka 0.8.2 does not have the jline jar Regards, prabcs On Wed, Jul 29, 2015 at 10:45 PM, Chris Barlock barl...@us.ibm.com wrote: You need the jline JAR file that ships with ZooKeeper. Chris IBM Tivoli Systems Research Triangle Park, NC (919) 224-2240 Internet: barl...@us.ibm.com From: Prabhjot Bharaj prabhbha...@gmail.com To: users@kafka.apache.org, u...@zookeeper.apache.org Date: 07/29/2015 01:13 PM Subject:Connection to zk shell on Kafka Hi folks, */kafka/bin# ./zookeeper-shell.sh localhost:2182/* *Connecting to localhost:2182/* *Welcome to ZooKeeper!* *JLine support is disabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *The shell never says connected* I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka broker has 1 zookeeper server running) When I try connecting to the shell, the shell never says 'Connected' However, if I try connecting on another standalone zookeeper which has no links to kafka, I'm able to connect:- */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181 http://127.0.0.1:2181* *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181* *Welcome to ZooKeeper!* *JLine support is enabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *[zk: 127.0.0.1:2181(CONNECTED) 0]* Am I missing something? Thanks, prabcs -- - There are only 10 types of people in the world: Those who understand binary, and those who don't
Re: Cache Memory Kafka Process
Hi Ewen, Thanks for reply. The assumptions that you made for replication and partitions are correct, 120 is total number of partitions and replication factor is 1 for all the topics. Does that mean that a broker will keep all the messages that are produced in memory, or will only the unconsumed messages. is there a way we can restrict this to only x number of messages or x MB of total data in memory. Regards, Nilesh Chhapru. On Tuesday 28 July 2015 12:37 PM, Ewen Cheslack-Postava wrote: Nilesh, It's expected that a lot of memory is used for cache. This makes sense because under the hood, Kafka mostly just reads and writes data to/from files. While Kafka does manage some in-memory data, mostly it is writing produced data (or replicated data) to log files and then serving those same messages to consumers directly out of the log files. It relies on OS-level file system caching optimize how data is managed. Operating systems are already designed to do this well, so it's generally better to reuse this functionality than to try to implement a custom caching layer. So when you see most of your memory consumed as cache, that's because the OS has used the access patterns for data in those files to select which parts of different files seem most likely to be useful in the future. As Daniel's link points out, it's only doing this when that memory is not needed for some other purpose. This approach isn't always perfect. If you have too much data to fit in memory and you scan through it, performance will suffer. Eventually, you will hit regions of files that are not in cache and the OS will be forced to read those off disk, which is much slower than reading from cache. From your description I'm not sure if you have 120 partitions *per topic* or *total* across all topics. Let's go with the lesser, 120 partitions total. You also mention 3 brokers. Dividing 120 partitions across 3 brokers, we get about 40 partitions each broker is a leader for, which is data it definitely needs cached in order to serve consumers. You didn't mention the replication factor, so let's just ignore it here and assume the lowest possible, only 1 copy of the data. Even so, it looks like you have ~8GB of memory (based on the free -m numbers), and at 15 MB/message with 40 partitions per broker, that's only 8192/(15*40) = ~14 messages per partition that would fit in memory, assuming it was all used for file cache. That's not much, so if your total data stored is much larger and you ever have to read through any old data, your throughput will likely suffer. It's hard to say much more without understanding what your workload is like, if you're consuming data other than what the Storm spout is consuming, the rate at which you're producing data, etc. However, my initial impression is that you may be trying to process too much data with too little memory and too little disk throughput. If you want more details, I'd suggest reading this section of the docs, which further explains how a lot of this stuff works: http://kafka.apache.org/documentation.html#persistence -Ewen On Mon, Jul 27, 2015 at 11:19 PM, Nilesh Chhapru nilesh.chha...@ugamsolutions.com wrote: Hi Ewen, I am using 3 brokers with 12 topic and near about 120-125 partitions without any replication and the message size is approx 15MB/message. The problem is when the cache memory increases and reaches to the max available the performance starts degrading also i am using Storm spot as consumer which stops reading at times. When i do a free -m on my broker node after 1/2 - 1 hr the memory foot print is as follows. 1) Physical memory - 500 MB - 600 MB 2) Cache Memory - 6.5 GB 3) Free Memory - 50 - 60 MB Regards, Nilesh Chhapru. On Monday 27 July 2015 11:02 PM, Ewen Cheslack-Postava wrote: Having the OS cache the data in Kafka's log files is useful since it means that data doesn't need to be read back from disk when consumed. This is good for the latency and throughput of consumers. Usually this caching works out pretty well, keeping the latest data from your topics in cache and only pulling older data into memory if a consumer reads data from earlier in the log. In other words, by leveraging OS-level caching of files, Kafka gets an in-memory caching layer for free. Generally you shouldn't need to clear this data -- the OS should only be using memory that isn't being used anyway. Is there a particular problem you're encountering that clearing the cache would help with? -Ewen On Mon, Jul 27, 2015 at 2:33 AM, Nilesh Chhapru nilesh.chha...@ugamsolutions.com wrote: Hi All, I am facing issues with kafka broker process taking a lot of cache memory, just wanted to know if the process really need that much of cache memory, or can i clear the OS level cache by setting a cron. Regards, Nilesh Chhapru.
Re: Cache Memory Kafka Process
On Tue, Jul 28, 2015 at 11:29 PM, Nilesh Chhapru nilesh.chha...@ugamsolutions.com wrote: Hi Ewen, Thanks for reply. The assumptions that you made for replication and partitions are correct, 120 is total number of partitions and replication factor is 1 for all the topics. Does that mean that a broker will keep all the messages that are produced in memory, or will only the unconsumed messages. The operating system is caching the data, not Kafka. So there's no policy in Kafka that controls caching at that level. If you have consumers that repeatedly consume old data, the operating system will cache those sections of the files. If consumers are normally at the end of the logs, the operating system will cache those parts of the log files. (In fact, this doesn't even happen at the granularity of messages, this cache operates at the granularity of pages: https://en.wikipedia.org/wiki/Page_cache) But this is a good thing. If something else needed that memory, the OS would just get rid of the cached data, opting to read the data back from disk if it was needed again in the future. It's very unlikely that clearing any of this data would affect the performance of your workload. If you're seeing degradation of performance due to memory usage, it probably means you're simply trying to access more data than fits in memory and end up being limited by disk throughput as data needs to be reloaded. is there a way we can restrict this to only x number of messages or x MB of total data in memory. This works at the operating system level. You can adjust the retention policies, which would just delete the data (and by definition that will also take it out of cache), but you probably don't want to lose that data completely. Think of it this way: if you applied the type of restriction you're talking about, what data would you have discarded? Are any of your applications currently accessing the data that would have been discarded, e.g. because they are resetting to the beginning of the log and scanning through the full data set? If the answer is yes, then another way to view the situation is that its your applications that are misbehaving in the sense that they exhibit bad data access patterns that aren't actually required, resulting in accessing more data than necessary which doesn't fit in memory an therefore reduces your throughput. -Ewen Regards, Nilesh Chhapru. On Tuesday 28 July 2015 12:37 PM, Ewen Cheslack-Postava wrote: Nilesh, It's expected that a lot of memory is used for cache. This makes sense because under the hood, Kafka mostly just reads and writes data to/from files. While Kafka does manage some in-memory data, mostly it is writing produced data (or replicated data) to log files and then serving those same messages to consumers directly out of the log files. It relies on OS-level file system caching optimize how data is managed. Operating systems are already designed to do this well, so it's generally better to reuse this functionality than to try to implement a custom caching layer. So when you see most of your memory consumed as cache, that's because the OS has used the access patterns for data in those files to select which parts of different files seem most likely to be useful in the future. As Daniel's link points out, it's only doing this when that memory is not needed for some other purpose. This approach isn't always perfect. If you have too much data to fit in memory and you scan through it, performance will suffer. Eventually, you will hit regions of files that are not in cache and the OS will be forced to read those off disk, which is much slower than reading from cache. From your description I'm not sure if you have 120 partitions *per topic* or *total* across all topics. Let's go with the lesser, 120 partitions total. You also mention 3 brokers. Dividing 120 partitions across 3 brokers, we get about 40 partitions each broker is a leader for, which is data it definitely needs cached in order to serve consumers. You didn't mention the replication factor, so let's just ignore it here and assume the lowest possible, only 1 copy of the data. Even so, it looks like you have ~8GB of memory (based on the free -m numbers), and at 15 MB/message with 40 partitions per broker, that's only 8192/(15*40) = ~14 messages per partition that would fit in memory, assuming it was all used for file cache. That's not much, so if your total data stored is much larger and you ever have to read through any old data, your throughput will likely suffer. It's hard to say much more without understanding what your workload is like, if you're consuming data other than what the Storm spout is consuming, the rate at which you're producing data, etc. However, my initial impression is that you may be trying to process too much data with too little memory and too little disk throughput. If you want more details, I'd