Fetching topic metadata issue
Hi, Please help me in this issue i am unable to read the message from topic.
Re: Fetching topic metadata issue
Please give us more information: release of Kafka Did your consumer get any error ? Have you inspected broker log(s) ? Cheers On Sun, Sep 3, 2017 at 11:08 PM, Sagar Nadagoud < sagar.nadag...@wildjasmine.com> wrote: > Hi, > > Please help me in this issue i am unable to read the message from topic. >
Cluster expansion kafka 0.8.2.1
Hi, We are planning to expand cluster from 2 node to 8 node. The partition reassignment tool has the option to move topic or partition. Irrespective of number of node additions. If I give all the topics in the topic-to-move.json and all the brokers in the below command then it will give equal distribution of partition among nodes correct ? bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "2,3,4,5,6,7" --generate After this I am planning to apply the json --execute --reassignment-json-file option to rebalance cluster. Will this cause any problem ? This step seems to be more general but why it is not documented this way? Thanks Vimal
Re: Potential Bug | GlobalStateManager checkpoint
Thanks Sameer, yes this looks like a bug. Can you file a JIRA? On Mon, 4 Sep 2017 at 12:23 Sameer Kumarwrote: > Hi, > > I am using InMemoryStore along with GlobalKTable. I came to realize that I > was losing on data once I restart my stream application while it was > consuming data from kafka topic since it would always start with last saved > checkpoint. This shall work fine with RocksDB it being a persistent store. > for in memory store it should be consume from beginning. > > Debugging it further, I looked at the code for GlobalStateManagerImpl(this > one works for GlobalKTable) and was comparing the same with > ProcessorStateManagerImpl(this one works for KTable). > > In ProcessorStateManagerImpl.checkpoint, we have added the check for when > state store being persistent before writing the checkpoints, the same check > is not there in GlobalStateManagerImpl.checkpoint method. Do you think the > same check needs to be added for GlobalStateManagerImpl. > > public void checkpoint(final Map ackedOffsets) { > log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets); > checkpointedOffsets.putAll(changelogReader.restoredOffsets()); > for (final Map.Entry entry : stores.entrySet()) > { > final String storeName = entry.getKey(); > // only checkpoint the offset to the offsets file if > // it is persistent AND changelog enabled > *if (entry.getValue().persistent() && > storeToChangelogTopic.containsKey(storeName)) {* > final String changelogTopic = storeToChangelogTopic.get( > storeName); > final TopicPartition topicPartition = new > TopicPartition(changelogTopic, getPartition(storeName)); > if (ackedOffsets.containsKey(topicPartition)) { > // store the last offset + 1 (the log position after > restoration) > checkpointedOffsets.put(topicPartition, > ackedOffsets.get(topicPartition) + 1); > } else if (restoredOffsets.containsKey(topicPartition)) { > checkpointedOffsets.put(topicPartition, > restoredOffsets.get(topicPartition)); > } > } > } > // write the checkpoint file before closing, to indicate clean > shutdown > try { > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > checkpoint.write(checkpointedOffsets); > } catch (final IOException e) { > log.warn("Failed to write checkpoint file to {}:", new > File(baseDir, CHECKPOINT_FILE_NAME), e); > } > } > > Regards, > -Sameer. >
Potential Bug | GlobalStateManager checkpoint
Hi, I am using InMemoryStore along with GlobalKTable. I came to realize that I was losing on data once I restart my stream application while it was consuming data from kafka topic since it would always start with last saved checkpoint. This shall work fine with RocksDB it being a persistent store. for in memory store it should be consume from beginning. Debugging it further, I looked at the code for GlobalStateManagerImpl(this one works for GlobalKTable) and was comparing the same with ProcessorStateManagerImpl(this one works for KTable). In ProcessorStateManagerImpl.checkpoint, we have added the check for when state store being persistent before writing the checkpoints, the same check is not there in GlobalStateManagerImpl.checkpoint method. Do you think the same check needs to be added for GlobalStateManagerImpl. public void checkpoint(final MapackedOffsets) { log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets); checkpointedOffsets.putAll(changelogReader.restoredOffsets()); for (final Map.Entry entry : stores.entrySet()) { final String storeName = entry.getKey(); // only checkpoint the offset to the offsets file if // it is persistent AND changelog enabled *if (entry.getValue().persistent() && storeToChangelogTopic.containsKey(storeName)) {* final String changelogTopic = storeToChangelogTopic.get( storeName); final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); if (ackedOffsets.containsKey(topicPartition)) { // store the last offset + 1 (the log position after restoration) checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); } else if (restoredOffsets.containsKey(topicPartition)) { checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); } } } // write the checkpoint file before closing, to indicate clean shutdown try { if (checkpoint == null) { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } checkpoint.write(checkpointedOffsets); } catch (final IOException e) { log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME), e); } } Regards, -Sameer.