srkukarni commented on a change in pull request #1621:  refactoring java 
instance to use pull model for message ingress
URL: https://github.com/apache/incubator-pulsar/pull/1621#discussion_r182944409
 
 

 ##########
 File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 ##########
 @@ -204,40 +186,41 @@ public void run() {
                 // process the message
                 Object input;
                 try {
-                    input = 
msg.getInputSerDe().deserialize(msg.getActualMessage().getData());
+                    input = 
currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData());
                 } catch (Exception ex) {
-                    
stats.incrementDeserializationExceptions(msg.getTopicName());
-                    continue;
+                    
stats.incrementDeserializationExceptions(currentMessage.getTopicName());
+                    throw ex;
                 }
                 long processAt = System.currentTimeMillis();
                 stats.incrementProcessed(processAt);
                 addLogTopicHandler();
-                result = javaInstance.handleMessage(
-                        msg.getActualMessage().getMessageId(),
-                        msg.getTopicName(),
+                JavaExecutionResult result = javaInstance.handleMessage(
+                        currentMessage.getActualMessage().getMessageId(),
+                        currentMessage.getTopicName(),
                         input);
                 removeLogTopicHandler();
 
                 long doneProcessing = System.currentTimeMillis();
                 log.debug("Got result: {}", result.getResult());
 
                 if (null != stateContext) {
-                    stateContext.flush()
-                            .thenRun(() -> processResult(msg, result, 
processAt, doneProcessing))
-                            .exceptionally(cause -> {
-                                // log the messages, since we DONT ack, pulsar 
consumer will re-deliver the messages.
-                                log.error("Failed to flush the state updates 
of message {}", msg, cause);
-                                return null;
-                            });
-                } else {
-                    processResult(msg, result, processAt, doneProcessing);
+                    CompletableFuture completableFuture = stateContext.flush();
 
 Review comment:
   we are sync here as opposed to async before?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to