vvcephei commented on pull request #8903:
URL: https://github.com/apache/kafka/pull/8903#issuecomment-646798478


   As a POC, I've verfied this tool by hand by dumping the log from a 
suppression over windowed data:
   
   `consumer.properties`:
   ```
   bootstrap.servers=localhost:9092
   key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer
   
default.windowed.key.serde.inner=org.apache.kafka.common.serialization.Serdes$StringSerde
   value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
   ```
   
   `bin/kafka-run-class.sh 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferChangelogDumpTool
 consumer.properties 
stream-soak-test-logData10MinuteSuppressedCount-store-changelog 0`
   
   The result is output like:
   ```
   ...
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652987] timestamp=[1592576961370] 
key=[[gke-k8s-sz-b1-us-central-default-pool-239c28d1-568g@1592528400000/9223372036854775807]]
 <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652988] timestamp=[1592529165408] 
key=[[gke-k8s-sz-b1-us-central-default-pool-5p6i165g-8vhf@1592529000000/9223372036854775807]]
 priorValue=[null] oldValue=[null] newValue=[1] 
serializedContext=[ProcessorRecordContext{topic='node-name-repartition', 
partition=0, offset=448463657, timestamp=1592529165408, 
headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652989] timestamp=[1592529173959] 
key=[[gke-k8s-sz-b1-us-central-default-pool-tham13b7-x74o@1592529000000/9223372036854775807]]
 priorValue=[null] oldValue=[null] newValue=[1] 
serializedContext=[ProcessorRecordContext{topic='node-name-repartition', 
partition=0, offset=448463970, timestamp=1592529173959, 
headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652990] timestamp=[1592576961370] 
key=[[gke-k8s-sz-b1-us-central-default-pool-03wh5r35-2ns2@1592528400000/9223372036854775807]]
 <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652991] timestamp=[1592576961370] 
key=[[gke-k8s-sz-b1-us-central-default-pool-6l5j5i59-16qk@1592528400000/9223372036854775807]]
 <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652992] timestamp=[1592576961370] 
key=[[gke-k8s-sz-b1-us-central-default-pool-sma04jt2-l838@1592528400000/9223372036854775807]]
 <tombstone>
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652993] timestamp=[1592529180997] 
key=[[gke-k8s-sz-b1-us-central-default-pool-t9aqx35k-x999@1592529000000/9223372036854775807]]
 priorValue=[null] oldValue=[null] newValue=[1] 
serializedContext=[ProcessorRecordContext{topic='node-name-repartition', 
partition=0, offset=448464292, timestamp=1592529180997, 
headers=RecordHeaders(headers = [], isReadOnly = false)}]
   topic=[stream-soak-test-logData10MinuteSuppressedCount-store-changelog] 
partition=[0] offset=[785652994] timestamp=[1592576961370] 
key=[[gke-k8s-sz-b1-us-central-default-pool-l21x28ba-4bui@1592528400000/9223372036854775807]]
 <tombstone>
   Consumed to end of topic.
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to