Attila Bukor has posted comments on this change. ( http://gerrit.cloudera.org:8080/14329 )
Change subject: [java] KUDU-2791: process communicates via protobuf-based protocol ...................................................................... Patch Set 4: (15 comments) haven't finished reviewing yet, but I'm leaving a few comments. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@47 PS4, Line 47: private AtomicInteger retries; > Could this be local to each call to run()? If so, could it be non-atomic? yeah it should be local non-atomic so we retry 3 times for each message instead of 3 times total - and then not do anything for the remaining lifetime of the process http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@67 PS4, Line 67: public void run() { we should probably split this method into 3 methods: take from queue, process message and build response, then write to the output. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@69 PS4, Line 69: try { you can use try with resources here. If you do `try (BufferedOutputStream os = out)` it will automatically close the stream and you don't need to handle it in finally. Coupled with my suggestion in L64 you don't even need the finally block. Maybe we shouldn't even close the streams here, but do a try with resources where we instantiate the consumers and producers as the streams could be shared between threads in which case we shouldn't close them. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@70 PS4, Line 70: true this should be `while (!Thread.currentThread().isInterrupted())` instead. This doesn't change the interrupted flag either so there's no need for a separate interrupted flag in this scope. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@77 PS4, Line 77: } catch (InterruptedException e) { We shouldn't catch this here, we don't want to retry anything if the thread is interrupted. The point is to gracefully interrupt the process. In this case what we should is jump to a catch of the outer try block and log that we were interrupted and let the process exit. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@94 PS4, Line 94: response = respondWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, responseBuilder); I think we shouldn't respond here as we don't have a sequence ID so the server can't connect it back to a request anyway. We should retry after a timeout on the server side instead. http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@160 PS1, Line 160: > The given BufferedInputStream (or BufferedOutputStream) can be different ea Wouldn't it be still better to instantiate a separate MessageProcessor for each IO stream? http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@75 PS4, Line 75: return data; > nit: don't need the local variable -- just: or just `return "";` http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@148 PS4, Line 148: private void writeBinaryMessage(BufferedOutputStream out, this should be synchronized. Alternatively (probably a better approach) would be if we had two queues - one for input and one for output. We could have a MessageReader that reads from the InputStream and puts the messages on the input queue, the MessageProcessor would process the messages and put the response to the output queue and an MessageWriter would take the message from the output queue and write to the output stream. This way we can have a thread pool of MessageProcessors doing the actual work and a single MessageReader and MessageWriter that would take care of IO on the pipe. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@39 PS4, Line 39: should not be called concurrently unless handled by : * the caller. > I'm finding this a bit confusing: under what scenario can run() be called c you could instantiate a bunch of MessageProducers in a thread pool. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@70 PS4, Line 70: try { same as MessageConsumer http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@71 PS4, Line 71: while (true) { same as MessageConsumer http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@88 PS4, Line 88: messageProcessor.writeMessage(out, response); same as MessageConsumer, we don't have a sequence ID so we should just log the exception and drop the request. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@100 PS4, Line 100: while (retries.getAndDecrement() > 0) { same as MessageConsumer http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@106 PS4, Line 106: interrupted = true; we shouldn't retry after an interruption, as in this case the expected behavior is to gracefully interrupt. If there's an InterruptedException thrown by blockingQueue.put(), we should do `Thread.currentThread().interrupt()` and then break out the retry loop. -- To view, visit http://gerrit.cloudera.org:8080/14329 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Gerrit-Change-Number: 14329 Gerrit-PatchSet: 4 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Grant Henke <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Comment-Date: Tue, 19 Nov 2019 16:04:09 +0000 Gerrit-HasComments: Yes
