[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r632709623 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,77 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", -srcRecord, t); -stats.incrUserExceptions(t); -srcRecord.fail(); -} else { -if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { Review comment: This is where we do the error handling for the synchronous function calls. The logic inside this block is identical to error handling logic for asynchronous function calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r626865072 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws Exception { } } -private void setupOutput(ContextImpl contextImpl) throws Exception { +@SuppressWarnings({ "unchecked", "rawtypes" }) Review comment: No, they are not critical to the functionality of the PR, I just prefer to clean up such errors while I am working on a file since there are rarely, if ever, any tickets related to code cleanup efforts. So I just do it in a piecemeal fashion as I run across them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r626865072 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws Exception { } } -private void setupOutput(ContextImpl contextImpl) throws Exception { +@SuppressWarnings({ "unchecked", "rawtypes" }) Review comment: No, they are not critical to the functionality of the PR, I just prefer to clean up such errors while I am working on a file since there are rarely, if ever, and tickets related to code cleanup efforts. So I just do it in a piecemeal fashion as I run across them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -328,7 +342,8 @@ private void processResult(Record srcRecord, srcRecord.fail(); } else { if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); + // Grab the actual result + actualResult.set(result1.getResult()); Review comment: Can you elaborate a bit? FWIW, I have tried various scenarios to keep the call to `sendOutputMessage(srcRecord, result1.getResult());` insdie the `whenComplete` completion stage, but in all scenarios, the exception thrown is lost. I tried catching the exception and re-throwing a `CompletionException` and also tried calling `result.completeExceptionally()` on the sink error as well. In both cases the exception thrown by the Sink is lost. This was the only way to have the exception thrown by the Sink actually caught and handled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -328,7 +342,8 @@ private void processResult(Record srcRecord, srcRecord.fail(); } else { if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); + // Grab the actual result + actualResult.set(result1.getResult()); Review comment: Can you elaborate a bit? FWIW, I have tried various scenarios to keep the call to `sendOutputMessage(srcRecord, result1.getResult());` insdie the `whenComplete` completion stage, but in all scenarios, the exception throw is lost. I tried catching the exception and re-throwing a `CompletionException` and also tried calling `result.completeExceptionally()` on the sink error as well. This was the only way to have the exception thrown by the Sink actually caught and handled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -328,7 +342,8 @@ private void processResult(Record srcRecord, srcRecord.fail(); } else { if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); + // Grab the actual result + actualResult.set(result1.getResult()); Review comment: Can you elaborate a bit? FWIW, I have tried various scenarios to keep the call to `endOutputMessage(srcRecord, result1.getResult());` insdie the `whenComplete` completion stage, but in all scenarios, the exception throw is lost. I tried catching the exception and re-throwing a `CompletionException` and also tried calling `result.completeExceptionally()` on the sink error as well. This was the only way to have the exception thrown by the Sink actually caught and handled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] david-streamlio commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
david-streamlio commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r620550371 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -328,7 +342,8 @@ private void processResult(Record srcRecord, srcRecord.fail(); } else { if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); + // Grab the actual result + actualResult.set(result1.getResult()); Review comment: Can you elaborate a bit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org