[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504200#comment-16504200
 ] 

ASF GitHub Bot commented on KAFKA-6538:
---

mjsax closed pull request #5103: KAFKA-6538: Changes to enhance  ByteStore 
exceptions thrown from RocksDBStore with more human readable info
URL: https://github.com/apache/kafka/pull/5103
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
index 14464e09b00..200b2d7870a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InnerMeteredKeyValueStore.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -173,30 +174,40 @@ public long approximateNumEntries() {
 
 @Override
 public V get(final K key) {
-if (getTime.shouldRecord()) {
-return measureLatency(new Action() {
-@Override
-public V execute() {
-return 
typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
-}
-}, getTime);
-} else {
-return 
typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+try {
+if (getTime.shouldRecord()) {
+return measureLatency(new Action() {
+@Override
+public V execute() {
+return 
typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+}
+}, getTime);
+} else {
+return 
typeConverter.outerValue(inner.get(typeConverter.innerKey(key)));
+}
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key);
+throw new ProcessorStateException(message, e);
 }
 }
 
 @Override
 public void put(final K key, final V value) {
-if (putTime.shouldRecord()) {
-measureLatency(new Action() {
-@Override
-public V execute() {
-inner.put(typeConverter.innerKey(key), 
typeConverter.innerValue(value));
-return null;
-}
-}, putTime);
-} else {
-inner.put(typeConverter.innerKey(key), 
typeConverter.innerValue(value));
+try {
+if (putTime.shouldRecord()) {
+measureLatency(new Action() {
+@Override
+public V execute() {
+inner.put(typeConverter.innerKey(key), 
typeConverter.innerValue(value));
+return null;
+}
+}, putTime);
+} else {
+inner.put(typeConverter.innerKey(key), 
typeConverter.innerValue(value));
+}
+} catch (final ProcessorStateException e) {
+final String message = String.format(e.getMessage(), key, value);
+throw new ProcessorStateException(message, e);
 }
 }
 
@@ -232,15 +243,20 @@ public V execute() {
 
 @Override
 public V delete(final K key) {
-if (deleteTime.shouldRecord()) {
-return measureLatency(new Action() {
-@Override
-public V execute() {
-return 
typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
-}
-}, deleteTime);
-} else {
-return 
typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+try {
+if (deleteTime.shouldRecord()) {
+return measureLatency(new Action() {
+@Override
+public V execute() {
+return 
typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+}
+}, deleteTime);
+} else {
+return 
typeConverter.outerValue(inner.delete(typeConverter.innerKey(key)));
+}
+} catch (final ProcessorStateException e) {
+

[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496089#comment-16496089
 ] 

ASF GitHub Bot commented on KAFKA-6538:
---

jadireddi closed pull request #4846: KAFKA-6538: Fixed RocksDBStore to enhance 
processor state  exceptions with more context
URL: https://github.com/apache/kafka/pull/4846
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a2e45e00e56..6555a19a226 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -225,7 +225,7 @@ private void validateStoreOpen() {
 try {
 return this.db.get(rawKey);
 } catch (final RocksDBException e) {
-throw new ProcessorStateException("Error while getting value for 
key from store " + this.name, e);
+throw new ProcessorStateException("Error while getting value for 
key" + wrapToBytes(rawKey) + "from store " + this.name, e);
 }
 }
 
@@ -301,13 +301,15 @@ private void putInternal(final byte[] rawKey,
 try {
 db.delete(wOptions, rawKey);
 } catch (final RocksDBException e) {
-throw new ProcessorStateException("Error while removing key 
from store " + this.name, e);
+throw new ProcessorStateException("Error while removing key" + 
wrapToBytes(rawValue) + "from store " + this.name, e);
 }
 } else {
 try {
 db.put(wOptions, rawKey, rawValue);
 } catch (final RocksDBException e) {
-throw new ProcessorStateException("Error while executing 
putting key/value into store " + this.name, e);
+String str = String.format("Error while executing putting key 
%s value %s into store %s", wrapToBytes(rawKey),
+wrapToBytes(rawValue), this.name);
+throw new ProcessorStateException(str, e);
 }
 }
 }
@@ -465,6 +467,10 @@ private void closeOpenIterators() {
 }
 }
 
+private Bytes wrapToBytes(byte... rawValue) {
+return Bytes.wrap(rawValue);
+}
+
 private class RocksDbIterator implements KeyValueIterator {
 private final String storeName;
 private final RocksIterator iter;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-05-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496038#comment-16496038
 ] 

ASF GitHub Bot commented on KAFKA-6538:
---

jadireddi opened a new pull request #5103: KAFKA-6538: Changes to enhance  
ByteStore exceptions thrown from RocksDBStore with more human readable info
URL: https://github.com/apache/kafka/pull/5103
 
 
   https://issues.apache.org/jira/browse/KAFKA-6538
   
   Enhanced exceptions thrown from `RocksDBStore` with corresponding 
information for which key/value the operation failed in the wrapping stores 
(KeyValueStore, WindowedStored, and SessionStore). 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432146#comment-16432146
 ] 

ASF GitHub Bot commented on KAFKA-6538:
---

jadireddi opened a new pull request #4846: KAFKA-6538: Fixed RocksDBStore to 
enhance processor state  exceptions with more context
URL: https://github.com/apache/kafka/pull/4846
 
 
   https://issues.apache.org/jira/browse/KAFKA-6538
   Fixed against `Base:1.1` version.
   Fixed Class:`RocksDBStore` to enhance Processor State Exception containing 
Key/Value values as Bytes/byte[] to Bytes/Bytes form in the error message for 
displaying more useful context .
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-03-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420672#comment-16420672
 ] 

Matthias J. Sax commented on KAFKA-6538:


That is correct, and that is exactly the issue. In older releases, we passed in 
regular Java types/classes into RocksDBStore including corresponding Serdes. 
Later, we refactored the code and changed RocksDB to only accept  type, and thus, RocksDB cannot provide useful information (RocksDB also 
does not have any Serdes passed in). Thus, in this ticket, all "higer-lever" 
stores, that use RocksDB internally need to catch exceptions from RocksDB and 
add the missing key/value information. RocksDB itself cannot enhance it's 
exceptions because it's type agnostic.

> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-03-30 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420657#comment-16420657
 ] 

Jagadesh Adireddi commented on KAFKA-6538:
--

Hi [~mjsax],
To work on this ticket, I would like to get clarification on few things. In 
`RocksDBStore` can we enhance exception by doing `Bytes.wrap(byte[])` on 
Key/Value and throw exception. And
for Class RecordCollectorImpl, Error message stated in ticket as 
value [] timestamp YYY) to topic TTT
 .But i see *value* **type is already in *Bytes*. I guess it already displaying 
readable value and no changes required.  Please do correct me, if i am missing 
something.



 





 

> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)