Hi Stig, That's great! Thanks for all the info. Looking through the code, one small detail is the difference between storm-kafka-client's format and storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter uses 'offset' and 'nextOffset'.
So, can I map with firstOffset = offset and lastOffset = nextOffset+1 ? Looking through the code it seems to be that nextOffset is placed after the last consumed message, but I'm not sure. Thanks - Nasron On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <[email protected]> wrote: > Nasron, > > Okay, migrating a Trident spout is a very different thing. Trident spouts > store their state in Storm's zookeeper (unless you decide otherwise by > setting transactional.zookeeper.servers in storm.yaml). This also applies > to the storm-kafka-client Trident spout, so we won't need to move offsets > into Kafka. > > The idea of stopping all the producers and starting at LATEST (or > UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of > skipping tuples. In order to get Trident to commit something, you have to > deploy the new topology with LATEST and start the producers again, wait > until at least one commit happens, and then take the topology back down and > redeploy with whatever your first poll strategy normally is. If the worker > crashes before the spout manages to commit something, you will skip tuples. > > If you don't want to do that, here's my notes on storm-kafka -> > storm-kafka-client for Trident: > > The storage formats and zk paths for the two spouts are a little > different. Both spouts store their state as JSON maps, but some of the keys > are different. I use ${} below to indicate variable substitution. > > The root path (in the following: zkRoot) for your spouts data is > /${transactional.zookeeper.root from storm.yaml}/${txId you set with > TopologyBuilder.newStream}/user. > > For the storm-kafka spout the offsets are stored in one of the following > two paths: > ${zkRoot}/${topicName}partition_${partition} if you are using wildcard > topic subscriptions > ${zkRoot}/partition_${partition} otherwise > > The storage format for storm-kafka is as follows: > { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } } > if you are using wildcard topic subscriptions > { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } } > otherwise (I left out some irrelevant properties) > > For storm-kafka-client the zk path is > ${zkRoot}/${topicName}@${partition} > > and the storage format is > { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } } > > In order to migrate from storm-kafka to storm-kafka-client, we need to > stop the topology and run a script that moves the offsets from the old > location/format to the new location/format. There's no way to tell Trident > to read from one path/format and write to another, so it has to be done > offline. Once the offsets are migrated, the spout can be replaced in the > topology and the topology can be redeployed. > > I might look at writing an application that can do this at some point, but > it might take me a while. If you'd like to look at it yourself, here's some > pointers where to start: > * This is where the offset are written to Zookeeper, assuming you use an > opaque spout https://github.com/apache/storm/blob/master/storm-client/ > src/jvm/org/apache/storm/trident/spout/OpaquePartitione > dTridentSpoutExecutor.java#L184. You might want to look at this class for > a bit (particularly the emit function), because it's pretty useful for > understanding how/where Trident stores metadata for spouts. > * The return value of https://github.com/apache/stor > m/blob/master/external/storm-kafka/src/jvm/org/apache/ > storm/kafka/trident/TridentKafkaEmitter.java#L85 defines the format of > what's being saved to Zookeeper for storm-kafka. It's being wrapped in a > map so the full written value is { "${topicName}partition_${partition}': > ${theReturnValue} } (see the storage format note above, it's different if > you're not using wildcard subscriptions) > * Similarly for storm-kafka-client the return value of > https://github.com/apache/storm/blob/master/external/storm- > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > trident/KafkaTridentSpoutEmitter.java#L106 defines the format of what > that spout saves to Zookeeper (and expects to find). > * You should use zkCli (it's in your zookeeper/bin directory) to explore > your Zookeeper filesystem. It should be pretty easy to find your offsets in > there with that tool. > > Sorry about the wall of text, this turned out to have a lot of detail to > cover. > > 2018-01-10 21:40 GMT+01:00 Nasron Cheong <[email protected]>: > >> Thanks Stig, >> >> So after some digging, I realized we are really migrating from the kafka >> trident emitter in storm-kafka, to the trident emitter in >> storm-kafka-client. >> >> As far as I can see, the offset information is still stored in zk, and >> the offset info for storm-kafka is (https://github.com/apache/sto >> rm/blob/master/external/storm-kafka/src/jvm/org/apache/ >> storm/kafka/trident/TridentKafkaEmitter.java#L140) >> >> However this seems quite different from storm-kafka-client, which uses >> https://github.com/apache/storm/blob/master/external/ >> storm-kafka-client/src/main/java/org/apache/storm/kafka/ >> spout/trident/KafkaTridentSpoutBatchMetadata.java#L56 >> >> I'm not sure under which zknode this information is stored - and if the >> zknode itself is different between the two implementations. >> >> Looks like I need a tool to copy the stored values in zk from old >> storm-kafka to storm-kafka-client? >> >> Another option I suppose is to: >> - stop topic producers >> - run the old code until it drains all topics >> - start new code with FirstPollOffsetStrategy.LATEST >> >> Although this seems risky. >> >> Thanks! >> >> - Nasron >> >> >> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <[email protected]> >> wrote: >> >>> Hi Nasron, >>> >>> I don't believe there's currently a tool to help you migrate. We did it >>> manually by writing a small utility that looked up the commit offsets in >>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id >>> and committed the offsets for the appropriate partitions. We stopped our >>> topologies, used this utility and redeployed with the new spout. >>> >>> Assuming there isn't already a tool for migration floating around >>> somewhere, I think we could probably build some migration support into the >>> storm-kafka-client spout. If the path to the old offsets in Storm's >>> Zookeeper is given, we might be able to extract them and start up the new >>> spout from there. >>> >>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <[email protected]>: >>> >>>> Hi, >>>> >>>> I'm trying to determine steps for migration to the storm-kafka-client >>>> in order to use the new kafka client. >>>> >>>> It's not quite clear to me how offsets are migrated - is there a >>>> specific set of steps to ensure offsets are moved from the ZK based offsets >>>> into the kafka based offsets? >>>> >>>> Or is the original configuration respected, and storm-kafka-client can >>>> mostly be a drop in replacement? >>>> >>>> I want to avoid having spouts reset to the beginning of topics after >>>> deployment, due to this change. >>>> >>>> Thanks. >>>> >>>> - Nasron >>>> >>> >>> >> >
