[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-14 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r632667651



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,77 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
-srcRecord, t);
-stats.incrUserExceptions(t);
-srcRecord.fail();
-} else {
-if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {
+stats.incrUserExceptions(result.getUserException());
+srcRecord.fail();
+return;
+   }
+   
+   if (result.isAsync()) {
+  result.getFuture().whenComplete((result1, throwable) -> {
+  if (throwable != null) {
+  Throwable t = throwable;
+  log.warn("Encountered exception when processing message {}", 
srcRecord, t);
+  stats.incrUserExceptions(t);
+  srcRecord.fail();
+  } else {
+if (result1 != null) {
+  try {
+sendOutputMessage(srcRecord, result1);
+  } catch (SinkException e) {
+log.warn("Encountered exception when publishing result 
{}", srcRecord, e);
+  }
 } else {
-if (instanceConfig.getFunctionDetails().getAutoAck()) {
-// the function doesn't produce any result or the user 
doesn't want the result.
-srcRecord.ack();
-}
+  if (instanceConfig.getFunctionDetails().getAutoAck()) {
+// the function doesn't produce any result or the user 
doesn't want the result.
+srcRecord.ack();
+  }
 }
 // increment total successfully processed
 stats.incrTotalProcessedSuccessfully();
+  }
+  }); 
+ } else {

Review comment:
   Whether sync or async the error handling should be consistent:
   
   
https://github.com/apache/pulsar/pull/10370/files#diff-4b164878665f89af44f9ace0a3df8a27f4f2fb700fd9981d663fb3a7e4317e7aR338




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-14 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r632663569



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,77 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
-srcRecord, t);
-stats.incrUserExceptions(t);
-srcRecord.fail();
-} else {
-if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {
+stats.incrUserExceptions(result.getUserException());
+srcRecord.fail();
+return;
+   }
+   
+   if (result.isAsync()) {
+  result.getFuture().whenComplete((result1, throwable) -> {
+  if (throwable != null) {
+  Throwable t = throwable;
+  log.warn("Encountered exception when processing message {}", 
srcRecord, t);
+  stats.incrUserExceptions(t);
+  srcRecord.fail();
+  } else {
+if (result1 != null) {
+  try {
+sendOutputMessage(srcRecord, result1);
+  } catch (SinkException e) {
+log.warn("Encountered exception when publishing result 
{}", srcRecord, e);
+  }
 } else {
-if (instanceConfig.getFunctionDetails().getAutoAck()) {
-// the function doesn't produce any result or the user 
doesn't want the result.
-srcRecord.ack();
-}
+  if (instanceConfig.getFunctionDetails().getAutoAck()) {
+// the function doesn't produce any result or the user 
doesn't want the result.
+srcRecord.ack();
+  }
 }
 // increment total successfully processed
 stats.incrTotalProcessedSuccessfully();
+  }
+  }); 
+ } else {

Review comment:
   In this code path we still need to check if there was a user exception 
thrown in the function and fail the record is there was.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-06 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r627842825



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,79 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {
+   stats.incrUserExceptions(result.getUserException());

Review comment:
   still seems to be some formatting issues

##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,79 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {
+   stats.incrUserExceptions(result.getUserException());
+srcRecord.fail();
+return;
+   }
+   
+   if (result.isAsync()) {
+  result.getFuture().whenComplete((result1, throwable) -> {
+  if (throwable != null) {
+  Throwable t = throwable;
+  log.warn("Encountered exception when processing message {}",
 srcRecord, t);
-stats.incrUserExceptions(t);
-srcRecord.fail();
-} else {
-if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
-} else {
+  stats.incrUserExceptions(t);
+  srcRecord.fail();
+  } else {
+  if (result1 != null) {
+   try {
+  sendOutputMessage(srcRecord, result1);
+   } catch (SinkException e) {

Review comment:
   formatting issue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-05 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r626798469



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -743,7 +782,8 @@ private void setupInput(ContextImpl contextImpl) throws 
Exception {
 }
 }
 
-private void setupOutput(ContextImpl contextImpl) throws Exception {
+@SuppressWarnings({ "unchecked", "rawtypes" })

Review comment:
   Do we need to add these `@SuppressWarnings` as part of this PR?  Seems 
like a separate issue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-05-05 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r626793224



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -317,46 +322,79 @@ private void setupStateStore() throws Exception {
 }
 }
 
-private void processResult(Record srcRecord,
-   CompletableFuture result) 
throws Exception {
-result.whenComplete((result1, throwable) -> {
-if (throwable != null || result1.getUserException() != null) {
-Throwable t = throwable != null ? throwable : 
result1.getUserException();
-log.warn("Encountered exception when processing message {}",
+private void processResult(@SuppressWarnings("rawtypes") Record srcRecord,
+   JavaExecutionResult result) throws 
SinkException {
+   
+   if (result.getUserException() != null) {
+   stats.incrUserExceptions(result.getUserException());
+srcRecord.fail();
+return;
+   }
+   
+   if (result.isAsync()) {
+  result.getFuture().whenComplete((result1, throwable) -> {
+  if (throwable != null) {
+  Throwable t = throwable;
+  log.warn("Encountered exception when processing message {}",
 srcRecord, t);
-stats.incrUserExceptions(t);
-srcRecord.fail();
-} else {
-if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
-} else {
+  stats.incrUserExceptions(t);
+  srcRecord.fail();
+  } else {
+  if (result1 != null) {
+   try {
+   sendOutputMessage(srcRecord, 
result1);

Review comment:
   too many spaces?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] jerrypeng commented on a change in pull request #10370: [Issue-10269] : [Functions] : Pulsar IO Sink errors aren't bubbled up properly

2021-04-26 Thread GitBox


jerrypeng commented on a change in pull request #10370:
URL: https://github.com/apache/pulsar/pull/10370#discussion_r620546174



##
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##
@@ -328,7 +342,8 @@ private void processResult(Record srcRecord,
 srcRecord.fail();
 } else {
 if (result1.getResult() != null) {
-sendOutputMessage(srcRecord, result1.getResult());
+   // Grab the actual result
+   actualResult.set(result1.getResult());

Review comment:
   I don't think this is going to work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org