[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504200#comment-16504200 ] ASF GitHub Bot commented on KAFKA-6538: --- mjsax closed pull request #5103: KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info URL: https://github.com/apache/kafka/pull/5103 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java index 14464e09b00..200b2d7870a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -173,30 +174,40 @@ public long approximateNumEntries() { @Override public V get(final K key) { -if (getTime.shouldRecord()) { -return measureLatency(new Action() { -@Override -public V execute() { -return typeConverter.outerValue(inner.get(typeConverter.innerKey(key))); -} -}, getTime); -} else { -return typeConverter.outerValue(inner.get(typeConverter.innerKey(key))); +try { +if (getTime.shouldRecord()) { +return measureLatency(new Action() { +@Override +public V execute() { +return typeConverter.outerValue(inner.get(typeConverter.innerKey(key))); +} +}, getTime); +} else { +return typeConverter.outerValue(inner.get(typeConverter.innerKey(key))); +} +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key); +throw new ProcessorStateException(message, e); } } @Override public void put(final K key, final V value) { -if (putTime.shouldRecord()) { -measureLatency(new Action() { -@Override -public V execute() { -inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value)); -return null; -} -}, putTime); -} else { -inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value)); +try { +if (putTime.shouldRecord()) { +measureLatency(new Action() { +@Override +public V execute() { +inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value)); +return null; +} +}, putTime); +} else { +inner.put(typeConverter.innerKey(key), typeConverter.innerValue(value)); +} +} catch (final ProcessorStateException e) { +final String message = String.format(e.getMessage(), key, value); +throw new ProcessorStateException(message, e); } } @@ -232,15 +243,20 @@ public V execute() { @Override public V delete(final K key) { -if (deleteTime.shouldRecord()) { -return measureLatency(new Action() { -@Override -public V execute() { -return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key))); -} -}, deleteTime); -} else { -return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key))); +try { +if (deleteTime.shouldRecord()) { +return measureLatency(new Action() { +@Override +public V execute() { +return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key))); +} +}, deleteTime); +} else { +return typeConverter.outerValue(inner.delete(typeConverter.innerKey(key))); +} +} catch (final ProcessorStateException e) { +
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496089#comment-16496089 ] ASF GitHub Bot commented on KAFKA-6538: --- jadireddi closed pull request #4846: KAFKA-6538: Fixed RocksDBStore to enhance processor state exceptions with more context URL: https://github.com/apache/kafka/pull/4846 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index a2e45e00e56..6555a19a226 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -225,7 +225,7 @@ private void validateStoreOpen() { try { return this.db.get(rawKey); } catch (final RocksDBException e) { -throw new ProcessorStateException("Error while getting value for key from store " + this.name, e); +throw new ProcessorStateException("Error while getting value for key" + wrapToBytes(rawKey) + "from store " + this.name, e); } } @@ -301,13 +301,15 @@ private void putInternal(final byte[] rawKey, try { db.delete(wOptions, rawKey); } catch (final RocksDBException e) { -throw new ProcessorStateException("Error while removing key from store " + this.name, e); +throw new ProcessorStateException("Error while removing key" + wrapToBytes(rawValue) + "from store " + this.name, e); } } else { try { db.put(wOptions, rawKey, rawValue); } catch (final RocksDBException e) { -throw new ProcessorStateException("Error while executing putting key/value into store " + this.name, e); +String str = String.format("Error while executing putting key %s value %s into store %s", wrapToBytes(rawKey), +wrapToBytes(rawValue), this.name); +throw new ProcessorStateException(str, e); } } } @@ -465,6 +467,10 @@ private void closeOpenIterators() { } } +private Bytes wrapToBytes(byte... rawValue) { +return Bytes.wrap(rawValue); +} + private class RocksDbIterator implements KeyValueIterator { private final String storeName; private final RocksIterator iter; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496038#comment-16496038 ] ASF GitHub Bot commented on KAFKA-6538: --- jadireddi opened a new pull request #5103: KAFKA-6538: Changes to enhance ByteStore exceptions thrown from RocksDBStore with more human readable info URL: https://github.com/apache/kafka/pull/5103 https://issues.apache.org/jira/browse/KAFKA-6538 Enhanced exceptions thrown from `RocksDBStore` with corresponding information for which key/value the operation failed in the wrapping stores (KeyValueStore, WindowedStored, and SessionStore). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432146#comment-16432146 ] ASF GitHub Bot commented on KAFKA-6538: --- jadireddi opened a new pull request #4846: KAFKA-6538: Fixed RocksDBStore to enhance processor state exceptions with more context URL: https://github.com/apache/kafka/pull/4846 https://issues.apache.org/jira/browse/KAFKA-6538 Fixed against `Base:1.1` version. Fixed Class:`RocksDBStore` to enhance Processor State Exception containing Key/Value values as Bytes/byte[] to Bytes/Bytes form in the error message for displaying more useful context . *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420672#comment-16420672 ] Matthias J. Sax commented on KAFKA-6538: That is correct, and that is exactly the issue. In older releases, we passed in regular Java types/classes into RocksDBStore including corresponding Serdes. Later, we refactored the code and changed RocksDB to only accepttype, and thus, RocksDB cannot provide useful information (RocksDB also does not have any Serdes passed in). Thus, in this ticket, all "higer-lever" stores, that use RocksDB internally need to catch exceptions from RocksDB and add the missing key/value information. RocksDB itself cannot enhance it's exceptions because it's type agnostic. > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420657#comment-16420657 ] Jagadesh Adireddi commented on KAFKA-6538: -- Hi [~mjsax], To work on this ticket, I would like to get clarification on few things. In `RocksDBStore` can we enhance exception by doing `Bytes.wrap(byte[])` on Key/Value and throw exception. And for Class RecordCollectorImpl, Error message stated in ticket as value [] timestamp YYY) to topic TTT .But i see *value* **type is already in *Bytes*. I guess it already displaying readable value and no changes required. Please do correct me, if i am missing something. > Enhance ByteStore exceptions with more context information > -- > > Key: KAFKA-6538 > URL: https://issues.apache.org/jira/browse/KAFKA-6538 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Minor > Labels: newbie > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with > corresponding information for which key/value the operation failed in the > wrapping stores (KeyValueStore, WindowedStored, and SessionStore). > Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} > exceptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)