Hao Hao has posted comments on this change. ( http://gerrit.cloudera.org:8080/14329 )
Change subject: [java] KUDU-2791: process communicates via protobuf-based protocol ...................................................................... Patch Set 5: (24 comments) http://gerrit.cloudera.org:8080/#/c/14329/5//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/14329/5//COMMIT_MSG@22 PS5, Line 22: IOExcpetion > nit: IOException Done http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@70 PS4, Line 70: > hm... there's no way to gracefully shut down in this case though. If we don As mentioned earlier, it is advised to 'Restore the interruption status so that code higher up on the call stack can deal with it' in Java Concurrency in Practice' Chapter 7 (p143). Even though we don't consider InterruptedException from the BlockingQueue fatal, we still want the caller to know about it when fatal exception happen. However, as you suggested to register a shutdown hook for orderly shutdown, we need to handle InterruptedException from ExecturorService.shutdownNow(), so updated it with your suggestion. http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@148 PS4, Line 148: */ > even if BufferedOutputStream synchronizes, write and flush, I think it shou I updated as you suggested to synchronized on out. http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@77 PS5, Line 77: binary mode > nit: "binary mode" doesn't exist in the context of the java subprocess anym Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@85 PS5, Line 85: * @throws IllegalArgumentException if the serialization mode is invalid : * @throws IllegalStateException if the message is malformed > nit: remove this Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@88 PS5, Line 88: readMessage > nit: how about calling this readBytes() or somesuch? Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@88 PS5, Line 88: String readMessage() throws IOException { > nit: Maybe we should add preconditions here that 'in' is set. Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@89 PS5, Line 89: if (in.available() <= 0) { > can this be negative? anyway, we should return "" instead of new String() s Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@117 PS5, Line 117: * @throws IllegalArgumentException if the serialization mode is invalid : * @throws InvalidProtocolBufferException if there are any unknown types : * in the message > nit: remove this Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@122 PS5, Line 122: void writeMessage(Message message) throws IOException { > nit: Maybe we should add preconditions here that 'out' is set. Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@125 PS5, Line 125: out.write(Bytes.concat(size, body)); : // Always do a flush after write to ensure no partial message is written. : out.flush(); > these two statements should be synchronized on out. Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@131 PS5, Line 131: protobuf message with the given parser and builder. > nit: we're not actually parsing a protobuf message -- we're parsing some by Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@33 PS5, Line 33: read > nit: reads Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@60 PS5, Line 60: true > this should be while (!Thread.currentThread().isInterrupted()). Right now t Updated as calling ExecutorService.shutdownNow() may interrupt the thread. http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@94 PS5, Line 94: LOG.debug("Message: {} has been put on the queue", data); > this should be in an if (LOG.isDebugEnabled()) block so we don't format a s Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@96 PS5, Line 96: } catch (InterruptedException e) { > why are we retrying when the thread is interrupted? Shouldn't we simply int I don't see it is necessary to cancel the activity that puts the message in the queue if it get blocked and interrupted immediately, it may success with following retry. Moreover, even if 'blockingQueue.put' fail after several retries, we don't want a single message process failure to cause the program to fail. Or I am missing anything? http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java: http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@37 PS5, Line 37: write > nit: writes Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@63 PS5, Line 63: while (true) { > this should be while (!Thread.currentThread().isInterrupted()) as well, sim Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@69 PS5, Line 69: LOG.debug("Message: {} has been taken from the queue", data); > if (LOG.isDebugEnabled()) Done http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java: http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@88 PS4, Line 88: consumerFuture.exceptionally(errorHandler); > yeah we can skip shutdowns here, but maybe add them in a shutdown hook? Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java: PS5: > we should register a shutdown hook that shuts down the threadpools (shutdow Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@67 PS5, Line 67: Function<Throwable, Object> errorHandler = (t) -> { : System.exit(1); : return null; : }; > nit: Could this be static? Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@78 PS5, Line 78: producer > nit: we've shifted over to calling these 'readers' and 'writers'. Could you Done http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java File java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java: http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@69 PS5, Line 69: MessageProcessor messageProcessor = new MessageProcessor( : SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT); > Do we have to have a different constructor for this? Couldn't we just pass Done -- To view, visit http://gerrit.cloudera.org:8080/14329 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Gerrit-Change-Number: 14329 Gerrit-PatchSet: 5 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Grant Henke <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Comment-Date: Mon, 16 Dec 2019 07:15:27 +0000 Gerrit-HasComments: Yes
