Re: kafka-python message offset?

2015-07-29 Thread Keith Wiley
I'm still not getting the necessary behavior.  If I run on the command line, I 
get a series of messages:


$ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test 
--from-beginning
Test
test
tests
asdf


If I exclude the --from-beginning argument then it hangs, which indicates to me 
that the offset is currently at the end and awaiting new messages. If I run 
through python, it also hangs, which is why I suspect it is insistently reading 
from the end.  See below:


print Begin constructing SimpleConsumer
client = KafkaClient(servers)
s_consumer = SimpleConsumer(client,
topic,
group_id,
partitions=[0], #Not sure I need this
auto_commit=False, #Tried with and without, 
doesn't fix the problem
auto_offset_reset='smallest' #Tried with and 
without, doesn't fix the problem
)
print End constructing SimpleConsumer\n

print Begin reading messages
try:
for message in s_consumer:
print   New message
print+ message.topic
print+ message.partition
print+ message.offset
print+ message.key
print+ message.value
except Exception as e:
print Exception: , e
print End reading messages\n

print End all


Output:

Begin all

Begin constructing SimpleConsumer
End constructing SimpleConsumer

Begin reading messages


It just hangs after that.  I also tried with a KafkaConsumer instead of a 
SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com






From: Dana Powers dana.pow...@rd.io
Sent: Tuesday, July 28, 2015 09:58 PM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

Hi Keith,

you can use the `auto_offset_reset` parameter to kafka-python's
KafkaConsumer. It behaves the same as the java consumer configuration of
the same name. See
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
for more details on how to configure a KafkaConsumer instance.

For fine-grained control wrt configuring topic/partition offsets, use
KafkaConsumer.set_topic_partitions() . For the most control, pass a
dictionary of {(topic, partition): offset, ...} .
see
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions

-Dana

On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com wrote:

 I haven’t found a way to specify that a consumer should read from the
 beginning, or from any other explicit offset, or that the offset should be
 “reset” in any way.  The command-line shell scripts (which I believe simply
 wrap the Scala tools) have flags for this sort of thing.  Is there any way
 to do this through the python library?

 Thanks.

Re: kafka-python message offset?

2015-07-29 Thread Keith Wiley
Oh, I'm sorry.  If I use the KafkaConsumer class instead of the SimpleConsumer 
class (as you suggested) it works.  Frustratingly, SimpleConsumer will take the 
auto_offset_reset parameter without complaining that no such parameter exists, 
yet it doesn't work properly!  But KafkaConsumer works, so I'm in better shape 
now.  Thank you.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com


Re: kafka-python message offset?

2015-07-29 Thread Keith Wiley
Thanks.  I got it to work if I use KafkaConsumer.  I doesn't yet work with 
SimpleConsumer, and that includes seeking to 0,0.  I'm curious what that isn't 
getting it going.  It's frustrating because SimpleConsumer supports seek while 
KafkaConsumer doesn't offer a seek function, but at the same time, I can reset 
KafkaConsumer to the beginning with auto_offset_reset while SimpleConsumer has 
yet to work for me any way at all, including with a seek.  So far, neither 
class is optimal for me (SimpleConsumer doesn't work at all yet and 
KafkaConsumer has no seek function).

I'm using kafka-python 0.9.4 btw, just whatever version came up when pip 
installed it.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com






From: Steve Miller st...@idrathernotsay.com
Sent: Wednesday, July 29, 2015 09:33 AM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

   Are you using mumrah/kafka-python?  I think so from context but I know 
there's at least one other implementation rattling around these days. (-:

   If that's what you're using, I can see two potential problems you might be 
having.  You can set the offset to some approximation of wherever you want, by 
using :

s_consumer.seek(offset, whence)

pydoc kafka.consumer says:

 |  seek(self, offset, whence)
 |  Alter the current offset in the consumer, similar to fseek
 |
 |  offset: how much to modify the offset
 |  whence: where to modify it from
 |  0 is relative to the earliest available offset (head)
 |  1 is relative to the current offset
 |  2 is relative to the latest known offset (tail)

   So if you want to go to the beginning, do seek(0, 0).

   Seeking to the beginning or the end should be pretty reliable.  You can seek 
(say) 100K messages relative to the beginning or the end or to the current 
offset, but with partitions and message arrival order and the like it's a bit 
of a crapshoot where you'll end up.  The API will divide your offset by the 
number of partitions, then (I believe) apply that delta to each partition.  
Hoping that the input stream is relatively well balanced.

   But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so 
much so that I don't remember the default.

   The other thing you might be running into is that if you are setting the 
partitions parameter to an array containing only zero, and your topic has more 
partitions than just partition #0, the producer might be publishing to a 
different partition.  But you've told the client to read only from partition 0, 
so in that case you'd see no data.  If you want to consume from every 
partition, don't pass in a partitions parameter.

-Steve

On Wed, Jul 29, 2015 at 04:07:33PM +, Keith Wiley wrote:
 I'm still not getting the necessary behavior.  If I run on the command line, 
 I get a series of messages:

 
 $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test 
 --from-beginning
 Test
 test
 tests
 asdf
 

 If I exclude the --from-beginning argument then it hangs, which indicates to 
 me that the offset is currently at the end and awaiting new messages. If I 
 run through python, it also hangs, which is why I suspect it is insistently 
 reading from the end.  See below:

 
 print Begin constructing SimpleConsumer
 client = KafkaClient(servers)
 s_consumer = SimpleConsumer(client,
 topic,
 group_id,
 partitions=[0], #Not sure I need this
 auto_commit=False, #Tried with and without, 
 doesn't fix the problem
 auto_offset_reset='smallest' #Tried with and 
 without, doesn't fix the problem
 )
 print End constructing SimpleConsumer\n

 print Begin reading messages
 try:
 for message in s_consumer:
 print   New message
 print+ message.topic
 print+ message.partition
 print+ message.offset
 print+ message.key
 print+ message.value
 except Exception as e:
 print Exception: , e
 print End reading messages\n

 print End all
 

 Output:

 Begin all

 Begin constructing SimpleConsumer
 End constructing SimpleConsumer

 Begin reading messages

 
 It just hangs after that.  I also tried with a KafkaConsumer instead of a 
 SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.

 
 Keith Wiley
 Senior Software Engineer, Atigeo
 keith.wi...@atigeo.com





 
 From: Dana Powers dana.pow...@rd.io
 Sent: Tuesday, July 28, 2015 09:58 PM
 To: users@kafka.apache.org
 Subject: Re

Re: kafka-python message offset?

2015-07-29 Thread Keith Wiley
I got it.  It has been tricky getting both consumer classes to work since they 
are not very similar.  I configured one incorrectly because they take the 
arguments in different orders (in one topic comes before group and in the other 
that order is reversed).  Now that it works, I can also see that the message 
classes they return are different, so the contents must be teased out in 
slightly different ways.

It's basically working now.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com






From: Dana Powers dana.pow...@rd.io
Sent: Wednesday, July 29, 2015 10:31 AM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

Have you tried not setting a group_id in SimpleConsumer? If you have stored
offsets in ZK for that group, and those offsets still exist on the server,
the consumer will use them and not 'reset'. My hunch is that is your
problem. You might also consider enabling kafka debug logs (though not for
the faint-of-heart). Try initializing SimpleConsumer like so:

```
 s_consumer = SimpleConsumer(client, topic, None,
auto_offset_reset='smallest')
```

General thought: SimpleConsumer.seek() is a poorly constructed api. We have
not deprecated it yet, but I recommend switching to KafkaConsumer instead.
The same goes for SimpleConsumer as a whole, actually. It's api is quite
old and is maintained mostly to avoid breaking legacy installations.

Does KafkaConsumer.set_topic_partitions() work for your purposes? It will
take absolute, but not relative, offsets. Combined w/ an auto_offset_reset
policy, however, this should fulfill most use cases:

start from head: auto_offset_reset='smallest'
start from tail: auto_offset_reset='largest'
start from ZK-stored offset w/ reset to head: group_id='foo',
auto_offset_reset='smallest'
start from ZK-stored w/ reset to tail: group_id='foo',
auto_offset_reset='largest'
start from offline-stored offset w/ reset to head:
auto_offset_reset='smallest'; consumer.set_topic_partitions({('topic', 0):
1234})
start from offline-stored offset w/ reset to tail:
auto_offset_reset='largest'; consumer.set_topic_partitions({('topic', 0):
1234})

If there's another use-case here that you think should be covered, please
hop over to github.com/mumrah/kafka-python


-Dana
(kafka-python maintainer; KafkaConsumer author)


On Wed, Jul 29, 2015 at 9:38 AM, Keith Wiley keith.wi...@atigeo.com wrote:

 Thanks.  I got it to work if I use KafkaConsumer.  I doesn't yet work with
 SimpleConsumer, and that includes seeking to 0,0.  I'm curious what that
 isn't getting it going.  It's frustrating because SimpleConsumer supports
 seek while KafkaConsumer doesn't offer a seek function, but at the same
 time, I can reset KafkaConsumer to the beginning with auto_offset_reset
 while SimpleConsumer has yet to work for me any way at all, including with
 a seek.  So far, neither class is optimal for me (SimpleConsumer doesn't
 work at all yet and KafkaConsumer has no seek function).

 I'm using kafka-python 0.9.4 btw, just whatever version came up when pip
 installed it.


 Keith Wiley
 Senior Software Engineer, Atigeo
 keith.wi...@atigeo.com





 
 From: Steve Miller st...@idrathernotsay.com
 Sent: Wednesday, July 29, 2015 09:33 AM
 To: users@kafka.apache.org
 Subject: Re: kafka-python message offset?

Are you using mumrah/kafka-python?  I think so from context but I know
 there's at least one other implementation rattling around these days. (-:

If that's what you're using, I can see two potential problems you might
 be having.  You can set the offset to some approximation of wherever you
 want, by using :

 s_consumer.seek(offset, whence)

 pydoc kafka.consumer says:

  |  seek(self, offset, whence)
  |  Alter the current offset in the consumer, similar to fseek
  |
  |  offset: how much to modify the offset
  |  whence: where to modify it from
  |  0 is relative to the earliest available offset (head)
  |  1 is relative to the current offset
  |  2 is relative to the latest known offset (tail)

So if you want to go to the beginning, do seek(0, 0).

Seeking to the beginning or the end should be pretty reliable.  You can
 seek (say) 100K messages relative to the beginning or the end or to the
 current offset, but with partitions and message arrival order and the like
 it's a bit of a crapshoot where you'll end up.  The API will divide your
 offset by the number of partitions, then (I believe) apply that delta to
 each partition.  Hoping that the input stream is relatively well balanced.

But again, seek(0, 0) or seek(0, 2) has always worked reasonably for
 me, so much so that I don't remember the default.

The other thing you might be running into is that if you are setting
 the partitions parameter to an array containing only zero, and your topic
 has more partitions than just

kafka-python message offset?

2015-07-28 Thread Keith Wiley
I haven’t found a way to specify that a consumer should read from the 
beginning, or from any other explicit offset, or that the offset should be 
“reset” in any way.  The command-line shell scripts (which I believe simply 
wrap the Scala tools) have flags for this sort of thing.  Is there any way to 
do this through the python library?

Thanks.

KafkaConfigurationError: No topics or partitions configured

2015-07-28 Thread Keith Wiley
I'm trying to get a basic consumer off the ground.  I can create the consumer 
but I can't do anything at the message level:


consumer = KafkaConsumer(topic,
 group_id=group_id,
 bootstrap_servers=[ip + : + port])

for m in consumer:
print x

Note that I'm not even trying to use the message, I'm just trying loop over the 
consumer.  I'm getting an exception there somehow:


Exception: No topics or partitions configured
Traceback (most recent call last):
  File timed exec, line 3, in module
  File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py, line 
290, in next
return six.next(self._get_message_iterator())
  File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py, line 
324, in fetch_messages
raise KafkaConfigurationError('No topics or partitions configured')
KafkaConfigurationError: No topics or partitions configured


Any ideas?  I've been assured (although perhaps incorrectly) that the producer 
is configured, up and running.  Thanks.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com

[atigeo]http://atigeo.com/
[twitter]https://twitter.com/atigeo [LinkedIn] 
https://www.linkedin.com/company/atigeo  [YouTube] 
https://www.youtube.com/user/AtigeoXpatterns/  [blog] 
http://xpatterns.com/blog/



Re: KafkaConfigurationError: No topics or partitions configured

2015-07-28 Thread Keith Wiley
Thank you. It looks like I had the 'topic' slightly wrong.  I didn't realize it 
was case-sensitive.  I got past that error, but now I'm bumping up against 
another error:

Traceback (most recent call last):
...
  File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py, line 
59, in __init__
self.set_topic_partitions(*topics)
  File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py, line 
242, in set_topic_partitions
self._get_commit_offsets()
  File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py, line 
618, in _get_commit_offsets
check_error(resp)
  File /usr/local/lib/python2.7/dist-packages/kafka/common.py, line 230, in 
check_error
raise response
kafka.common.FailedPayloadsError

I've been told the producer is generating byte array data, not string data.  
I'm unsure whether that is the cause or what to do about it.

Keith Wiley
Senior Software Engineer, Atigeo
keith.wi...@atigeo.com






From: JIEFU GONG jg...@berkeley.edu
Sent: Tuesday, July 28, 2015 01:31 PM
To: users@kafka.apache.org
Subject: Re: KafkaConfigurationError: No topics or partitions configured

Can you confirm that there are indeed messages in the topic that you
published to?

bin/kafka-console-consumer.sh --zookeeper [details] --topic [topic]
--from-beginning

That should be the right command, and you can use that to first verify that
messages have indeed been published to the topic in question.
ᐧ

On Tue, Jul 28, 2015 at 11:33 AM, Keith Wiley keith.wi...@atigeo.com
wrote:

 I'm trying to get a basic consumer off the ground.  I can create the
 consumer but I can't do anything at the message level:


 consumer = KafkaConsumer(topic,
  group_id=group_id,
  bootstrap_servers=[ip + : + port])

 for m in consumer:
 print x

 Note that I'm not even trying to use the message, I'm just trying loop
 over the consumer.  I'm getting an exception there somehow:


 Exception: No topics or partitions configured
 Traceback (most recent call last):
   File timed exec, line 3, in module
   File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
 line 290, in next
 return six.next(self._get_message_iterator())
   File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
 line 324, in fetch_messages
 raise KafkaConfigurationError('No topics or partitions configured')
 KafkaConfigurationError: No topics or partitions configured


 Any ideas?  I've been assured (although perhaps incorrectly) that the
 producer is configured, up and running.  Thanks.


 Keith Wiley
 Senior Software Engineer, Atigeo
 keith.wi...@atigeo.com

 [atigeo]http://atigeo.com/
 [twitter]https://twitter.com/atigeo [LinkedIn] 
 https://www.linkedin.com/company/atigeo  [YouTube] 
 https://www.youtube.com/user/AtigeoXpatterns/  [blog] 
 http://xpatterns.com/blog/




--

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427