srkukarni commented on a change in pull request #1621: refactoring java instance to use pull model for message ingress URL: https://github.com/apache/incubator-pulsar/pull/1621#discussion_r182944409
########## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java ########## @@ -204,40 +186,41 @@ public void run() { // process the message Object input; try { - input = msg.getInputSerDe().deserialize(msg.getActualMessage().getData()); + input = currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData()); } catch (Exception ex) { - stats.incrementDeserializationExceptions(msg.getTopicName()); - continue; + stats.incrementDeserializationExceptions(currentMessage.getTopicName()); + throw ex; } long processAt = System.currentTimeMillis(); stats.incrementProcessed(processAt); addLogTopicHandler(); - result = javaInstance.handleMessage( - msg.getActualMessage().getMessageId(), - msg.getTopicName(), + JavaExecutionResult result = javaInstance.handleMessage( + currentMessage.getActualMessage().getMessageId(), + currentMessage.getTopicName(), input); removeLogTopicHandler(); long doneProcessing = System.currentTimeMillis(); log.debug("Got result: {}", result.getResult()); if (null != stateContext) { - stateContext.flush() - .thenRun(() -> processResult(msg, result, processAt, doneProcessing)) - .exceptionally(cause -> { - // log the messages, since we DONT ack, pulsar consumer will re-deliver the messages. - log.error("Failed to flush the state updates of message {}", msg, cause); - return null; - }); - } else { - processResult(msg, result, processAt, doneProcessing); + CompletableFuture completableFuture = stateContext.flush(); Review comment: we are sync here as opposed to async before? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services