[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r632667651 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,77 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", -srcRecord, t); -stats.incrUserExceptions(t); -srcRecord.fail(); -} else { -if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { +stats.incrUserExceptions(result.getUserException()); +srcRecord.fail(); +return; + } + + if (result.isAsync()) { + result.getFuture().whenComplete((result1, throwable) -> { + if (throwable != null) { + Throwable t = throwable; + log.warn("Encountered exception when processing message {}", srcRecord, t); + stats.incrUserExceptions(t); + srcRecord.fail(); + } else { +if (result1 != null) { + try { +sendOutputMessage(srcRecord, result1); + } catch (SinkException e) { +log.warn("Encountered exception when publishing result {}", srcRecord, e); + } } else { -if (instanceConfig.getFunctionDetails().getAutoAck()) { -// the function doesn't produce any result or the user doesn't want the result. -srcRecord.ack(); -} + if (instanceConfig.getFunctionDetails().getAutoAck()) { +// the function doesn't produce any result or the user doesn't want the result. +srcRecord.ack(); + } } // increment total successfully processed stats.incrTotalProcessedSuccessfully(); + } + }); + } else { Review comment: Whether sync or async the error handling should be consistent: https://github.com/apache/pulsar/pull/10370/files#diff-4b164878665f89af44f9ace0a3df8a27f4f2fb700fd9981d663fb3a7e4317e7aR338 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r632663569 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,77 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", -srcRecord, t); -stats.incrUserExceptions(t); -srcRecord.fail(); -} else { -if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { +stats.incrUserExceptions(result.getUserException()); +srcRecord.fail(); +return; + } + + if (result.isAsync()) { + result.getFuture().whenComplete((result1, throwable) -> { + if (throwable != null) { + Throwable t = throwable; + log.warn("Encountered exception when processing message {}", srcRecord, t); + stats.incrUserExceptions(t); + srcRecord.fail(); + } else { +if (result1 != null) { + try { +sendOutputMessage(srcRecord, result1); + } catch (SinkException e) { +log.warn("Encountered exception when publishing result {}", srcRecord, e); + } } else { -if (instanceConfig.getFunctionDetails().getAutoAck()) { -// the function doesn't produce any result or the user doesn't want the result. -srcRecord.ack(); -} + if (instanceConfig.getFunctionDetails().getAutoAck()) { +// the function doesn't produce any result or the user doesn't want the result. +srcRecord.ack(); + } } // increment total successfully processed stats.incrTotalProcessedSuccessfully(); + } + }); + } else { Review comment: In this code path we still need to check if there was a user exception thrown in the function and fail the record is there was. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r627842825 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,79 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { + stats.incrUserExceptions(result.getUserException()); Review comment: still seems to be some formatting issues ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,79 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { + stats.incrUserExceptions(result.getUserException()); +srcRecord.fail(); +return; + } + + if (result.isAsync()) { + result.getFuture().whenComplete((result1, throwable) -> { + if (throwable != null) { + Throwable t = throwable; + log.warn("Encountered exception when processing message {}", srcRecord, t); -stats.incrUserExceptions(t); -srcRecord.fail(); -} else { -if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); -} else { + stats.incrUserExceptions(t); + srcRecord.fail(); + } else { + if (result1 != null) { + try { + sendOutputMessage(srcRecord, result1); + } catch (SinkException e) { Review comment: formatting issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r626798469 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws Exception { } } -private void setupOutput(ContextImpl contextImpl) throws Exception { +@SuppressWarnings({ "unchecked", "rawtypes" }) Review comment: Do we need to add these `@SuppressWarnings` as part of this PR? Seems like a separate issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r626793224 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -317,46 +322,79 @@ private void setupStateStore() throws Exception { } } -private void processResult(Record srcRecord, - CompletableFuture result) throws Exception { -result.whenComplete((result1, throwable) -> { -if (throwable != null || result1.getUserException() != null) { -Throwable t = throwable != null ? throwable : result1.getUserException(); -log.warn("Encountered exception when processing message {}", +private void processResult(@SuppressWarnings("rawtypes") Record srcRecord, + JavaExecutionResult result) throws SinkException { + + if (result.getUserException() != null) { + stats.incrUserExceptions(result.getUserException()); +srcRecord.fail(); +return; + } + + if (result.isAsync()) { + result.getFuture().whenComplete((result1, throwable) -> { + if (throwable != null) { + Throwable t = throwable; + log.warn("Encountered exception when processing message {}", srcRecord, t); -stats.incrUserExceptions(t); -srcRecord.fail(); -} else { -if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); -} else { + stats.incrUserExceptions(t); + srcRecord.fail(); + } else { + if (result1 != null) { + try { + sendOutputMessage(srcRecord, result1); Review comment: too many spaces? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly
jerrypeng commented on a change in pull request #10370: URL: https://github.com/apache/pulsar/pull/10370#discussion_r620546174 ## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ## @@ -328,7 +342,8 @@ private void processResult(Record srcRecord, srcRecord.fail(); } else { if (result1.getResult() != null) { -sendOutputMessage(srcRecord, result1.getResult()); + // Grab the actual result + actualResult.set(result1.getResult()); Review comment: I don't think this is going to work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org