[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-14 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r632709623



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,77 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
-srcRecord, t);
-stats.incrUserExceptions(t);
-srcRecord.fail();
-} else {
-if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {

Review comment:
   This is where we do the error handling for the synchronous function 
calls. The logic inside this block is identical to error handling logic for 
asynchronous function calls.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-05 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r626865072



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws 
Exception {
 }
 }
 
-private void setupOutput(ContextImpl contextImpl) throws Exception {
+@SuppressWarnings({ "unchecked", "rawtypes" })

Review comment:
   No, they are not critical to the functionality of the PR, I just prefer 
to clean up such errors while I am working on a file since there are rarely, if 
ever, any tickets related to code cleanup efforts. So I just do it in a 
piecemeal fashion as I run across them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-05 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r626865072



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws 
Exception {
 }
 }
 
-private void setupOutput(ContextImpl contextImpl) throws Exception {
+@SuppressWarnings({ "unchecked", "rawtypes" })

Review comment:
   No, they are not critical to the functionality of the PR, I just prefer 
to clean up such errors while I am working on a file since there are rarely, if 
ever, and tickets related to code cleanup efforts. So I just do it in a 
piecemeal fashion as I run across them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-04-26 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -328,7 +342,8 @@ private void processResult(Record srcRecord,
 srcRecord.fail();
 } else {
 if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+   // Grab the actual result
+   actualResult.set(result1.getResult());

Review comment:
   Can you elaborate a bit? FWIW, I have tried various scenarios to keep 
the call to `sendOutputMessage(srcRecord, result1.getResult());` insdie the 
`whenComplete` completion stage, but in all scenarios, the exception thrown is 
lost. I tried catching the exception and re-throwing a `CompletionException` 
and also tried calling `result.completeExceptionally()` on the sink error as 
well. In both cases the exception thrown by the Sink is lost. This was the only 
way to have the exception thrown by the Sink actually caught and handled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-04-26 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -328,7 +342,8 @@ private void processResult(Record srcRecord,
 srcRecord.fail();
 } else {
 if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+   // Grab the actual result
+   actualResult.set(result1.getResult());

Review comment:
   Can you elaborate a bit? FWIW, I have tried various scenarios to keep 
the call to `sendOutputMessage(srcRecord, result1.getResult());` insdie the 
`whenComplete` completion stage, but in all scenarios, the exception throw is 
lost. I tried catching the exception and re-throwing a `CompletionException` 
and also tried calling `result.completeExceptionally()` on the sink error as 
well. This was the only way to have the exception thrown by the Sink actually 
caught and handled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-04-26 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -328,7 +342,8 @@ private void processResult(Record srcRecord,
 srcRecord.fail();
 } else {
 if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+   // Grab the actual result
+   actualResult.set(result1.getResult());

Review comment:
   Can you elaborate a bit? FWIW, I have tried various scenarios to keep 
the call to `endOutputMessage(srcRecord, result1.getResult());` insdie the 
`whenComplete` completion stage, but in all scenarios, the exception throw is 
lost. I tried catching the exception and re-throwing a `CompletionException` 
and also tried calling `result.completeExceptionally()` on the sink error as 
well. This was the only way to have the exception thrown by the Sink actually 
caught and handled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-04-26 Thread GitBox


david-streamlio commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -328,7 +342,8 @@ private void processResult(Record srcRecord,
 srcRecord.fail();
 } else {
 if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+   // Grab the actual result
+   actualResult.set(result1.getResult());

Review comment:
   Can you elaborate a bit? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org