The following are sample encoder/decoder in java. class StringEncode implements Encoder<String> { private String encoding = null; public StringEncoder(VerifiableProperties props) { if(props == null) encoding = "UTF8"; else encoding = props.getString("serializer.encoding", "UTF8"); }
public byte[] def toBytes(String s) { if(s == null) return null; else return s.getBytes(encoding); } } class StringDecoder implements Decoder<String> { private String encoding = null; public StringDecoder(VerifiableProperties props) { if(props == null) encoding = "UTF8"; else encoding = props.getString("serializer.encoding", "UTF8"); } public String fromBytes(byte bytes[]) { return new String(bytes, encoding); } } Thanks, Jun On Wed, May 1, 2013 at 12:33 PM, Chris Curtin <curtin.ch...@gmail.com>wrote: > Hi Jun > > I've added #1 and #2. > > I'll need to think about where to put #3, maybe even adding a 'tips and > tricks' section? > > I've not had to do any encoder/decoders. Can anyone else offer some example > code I can incorporate into an example? > > Thanks, > > Chris > > > On Wed, May 1, 2013 at 11:45 AM, Jun Rao <jun...@gmail.com> wrote: > > > Chris, > > > > Thanks. This is very helpful. I linked your wiki pages to our website. A > > few more comments: > > > > 1. Producer: The details of the meaning of request.required.acks are > > described in http://kafka.apache.org/08/configuration.html. It would be > > great if you can add a link to the description in your wiki. > > > > 2. High level consumer: Could you add the proper way of stopping the > > consumer? One just need to call consumer.shutdown(). After this is > called, > > hasNext() call in the Kafka stream iterator will return false. > > > > 3. SimpleConsumer: We have the following api that returns the offset of > the > > last message exposed to the consumer. The difference btw high watermark > and > > the offset of the last consumed message tells you how many messages the > > consumer is behind the broker. > > highWatermark(topic: String, partition: Int) > > > > Finally, it would be great if you can extend the wiki with customized > > encoder (Producer) and decoder (Consumer) at some point. > > Thanks, > > > > Jun > > > > > > On Wed, May 1, 2013 at 6:44 AM, Chris Curtin <curtin.ch...@gmail.com> > > wrote: > > > > > I've tested my examples with the new (4/30) release and they work, so > > I've > > > updated the documentation. > > > > > > Thanks, > > > > > > Chris > > > > > > > > > On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > Thanks. I also updated your producer example to reflect a recent > config > > > > change (broker.list => metadata.broker.list). > > > > > > > > Jun > > > > > > > > > > > > On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin < > curtin.ch...@gmail.com > > > > >wrote: > > > > > > > > > Thanks, I missed that the addition of consumers can cause a > > re-balance. > > > > > Thought it was only on Leader changes. > > > > > > > > > > I've updated the wording in the example. > > > > > > > > > > I'll pull down the beta and test my application then change the > names > > > on > > > > > the properties. > > > > > > > > > > Thanks, > > > > > > > > > > Chris > > > > > > > > > > > > > > > On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao <jun...@gmail.com> > wrote: > > > > > > > > > > > Basically, every time a consumer joins a group, every consumer in > > the > > > > > > groups gets a ZK notification and each of them tries to own a > > subset > > > of > > > > > the > > > > > > total number of partitions. A given partition is only assigned to > > one > > > > of > > > > > > the consumers in the same group. Once the ownership is > determined, > > > each > > > > > > consumer consumes messages coming from its partitions and manages > > the > > > > > > offset of those partitions. Since at any given point of time, a > > > > partition > > > > > > is only owned by one consumer, there won't be conflicts on > updating > > > the > > > > > > offsets. More details are described in the "consumer rebalancing > > > > > algorithm" > > > > > > section of http://kafka.apache.org/07/design.html > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin < > > > curtin.ch...@gmail.com > > > > > > >wrote: > > > > > > > > > > > > > Jun, can you explain this a little better? I thought when using > > > > > Consumer > > > > > > > Groups that on startup Kafka connects to ZooKeeper and finds > the > > > last > > > > > > read > > > > > > > offset for every partition in the topic being requested for the > > > > group. > > > > > > That > > > > > > > is then the starting point for the consumer threads. > > > > > > > > > > > > > > If a second process starts while the first one is running with > > the > > > > same > > > > > > > Consumer Group, won't the second one read the last offsets > > consumed > > > > by > > > > > > the > > > > > > > already running process and start processing from there? Then > as > > > the > > > > > > first > > > > > > > process syncs consumed offsets, won't the 2nd process's next > > update > > > > > > > overwrite them? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao <jun...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > Chris, > > > > > > > > > > > > > > > > Thanks for the writeup. Looks great overall. A couple of > > > comments. > > > > > > > > > > > > > > > > 1. At the beginning, it sounds like that one can't run > multiple > > > > > > processes > > > > > > > > of consumers in the same group. This is actually not true. We > > can > > > > > > create > > > > > > > > multiple instances of consumers for the same group in the > same > > > JVM > > > > or > > > > > > > > different JVMs. The consumers will auto-balance among > > themselves. > > > > > > > > > > > > > > > > 2. We have changed the name of some config properties. > > > > > > > > auto.commit.interval.ms is correct. However, zk.connect, > > > > > > > > zk.session.timeout.ms and zk.sync.time.ms are changed to > > > > > > > > zookeeper.connect, > > > > > > > > zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, > > > > > > respectively. > > > > > > > > > > > > > > > > I will add a link to your wiki in our website. > > > > > > > > > > > > > > > > Thanks again. > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin < > > > > > curtin.ch...@gmail.com > > > > > > > > >wrote: > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > I finished and published it this morning: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > > > > > > > > > > > > > > > > > > One question: when documenting the ConsumerConfig > parameters > > I > > > > > > couldn't > > > > > > > > > find a description for the 'auto.commit.interval.ms' > > setting. > > > I > > > > > > found > > > > > > > > one > > > > > > > > > for 'autocommit.interval.ms' (no '.' between auto and > > commit) > > > in > > > > > the > > > > > > > > > Google > > > > > > > > > Cache only. Which spelling is it? Also is my description of > > it > > > > > > correct? > > > > > > > > > > > > > > > > > > I'll take a look at custom encoders later this week. Today > > and > > > > > > Tuesday > > > > > > > > are > > > > > > > > > going to be pretty busy. > > > > > > > > > > > > > > > > > > Please let me know if there are changes needed to the High > > > Level > > > > > > > Consumer > > > > > > > > > page. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao < > jun...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > Chris, > > > > > > > > > > > > > > > > > > > > Any update of the high level consumer example? > > > > > > > > > > > > > > > > > > > > Also, in the Producer example, it would be useful to > > describe > > > > how > > > > > > to > > > > > > > > > write > > > > > > > > > > a customized encoder. One subtle thing is that the > encoder > > > > needs > > > > > a > > > > > > > > > > constructor that takes a a single VerifiableProperties > > > > argument ( > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-869). > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >