Hao Hao has posted comments on this change. ( http://gerrit.cloudera.org:8080/14329 )
Change subject: [java] KUDU-2971: process communicates via protobuf-based protocol ...................................................................... Patch Set 8: (37 comments) http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@7 PS7, Line 7: KUDU-2971 > KUDU-2971 Done http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@9 PS7, Line 9: a std > Should clarify here that because communication takes place over a stdin/std Done http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@18 PS7, Line 18: from the queue, process them and write the responses to the standard : output. : > Curious why you didn't go with writing the messages to another blocking que Makes sense. I am thinking to address it in a follow up patch as this one is big enough? http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java: http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java@96 PS6, Line 96: : : > Ah, I was asking how this interacts, if at all, with ERROR_HANDLER, but bas Ah, that is not only for output stream, updated. Thanks! http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@44 PS6, Line 44: : MessageProcessor(long maxMessageBytes, : BufferedInputStream in, : BufferedOutputStream out) { : this.maxMessageBytes = maxMessageBytes; : this.in = in; : this.out = out; : } : : /** : * Read a protobuf message, if any, from the underlying buffered input : * stream. The read is not atomic (partial read can happen if any : * exceptions occur) and blocking (waits until the input is available). : * : * @return the message in a byte array. : * @throws EOFException if the end of the stream has been reached : * @throws IOException if this input stream has been closed, an I/O : > I just requested constructors on the latest patch. I think one constructor Done http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@92 PS6, Line 92: private void doRead(byte bytes[], int size) throws EOFExcept > IllegalStateException is a RuntimeException which is an "Unchecked" excepti Ack http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@49 PS7, Line 49: this.in = in; > Can this be in the constructor instead? Mutable state like this can get tri Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@72 PS7, Line 72: String.format("message size (%d) exceeds maximum message size (%d)", : size, maxMessageBytes)); : } > Why do we do this? Don't we want to block waiting for additional input if n Makes sense, updated. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@83 PS7, Line 83: * input stream into the specified byte array, starting at the offset : * <code>0</code>. If fail to r > Nit: reformat using String.format Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@90 PS7, Line 90: * error occurs, or fail to read the specified size > Maybe include how much was actually read? Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@91 PS7, Line 91: */ > Why are we converting this into a UTF-8 String? It's a serialized PB messag Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@122 PS7, Line 122: > Why is this a String? Shouldn't it be a byte array? Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@133 PS7, Line 133: } : : /** > Nit: could combine: Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@145 PS7, Line 145: : /** : * Convert a 32-bit integer to a four bytes array in big endian order. : * @param value a 32-bit > Nit: could combine: Done http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98 PS6, Line 98: } > I'm asking whether that's what we're doing. 'finally' blocks are run uncond yeah, this still interrupts the caller if it cares about InterruptedException. This is to follow regular guideline of how to handling InterruptedException. However, in this case the caller(CompletableFuture.runAsync in Subprocessor.java) does not check for the interrupt flag based on my understanding. So it doesn't matter much to propagate or not. http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98 PS6, Line 98: } > Is there a test case around this retry/finally handling that verifies/show Added one. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@44 PS7, Line 44: private BlockingQueue<byte[]> blockingQueue; > Should be final too. Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@60 PS7, Line 60: : while (!Thread.currentThread().isInterrupted()) { : // Read the message from the standard input. If f > How is this safe? AFAICT, in most cases that readBytes() throws IllegalStat Makes sense, updated. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@84 PS7, Line 84: // InterruptedException during the put, record the interruption > Why do we retry if we've been interrupted? Isn't interruption a sign that t In this particular case, I don't think we would want to cancel the read/write task and exit the subprocess program due to InterruptedException thrown from the blocking queue (e.g cases like waiting on put the message to the queue and get interrupted). Therefore I chose to retry. But maybe you see there is a strong reason we should exit in such cases? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98 PS7, Line 98: } > IIUC, interruption means we retry the put() but then exit the main loop. Wh Hmm, not quite following your question, here we retry put() for certain times and continue (does not exit the main loop) either the retry succeeds or fails. And the retry is to try not lose a received message if possible. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@60 PS7, Line 60: boolean interrupted = false; > Same questions about interruption as for MessageReader. Ack http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@130 PS7, Line 130: * and should be propagated to the code higher up on the call stack. > I don't understand why we retry here at all, or what limits we're talking a Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java@64 PS7, Line 64: > Nit: the 'clazz' variant is only necessary for variable names, where 'class Done http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java: http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java@69 PS6, Line 69: QUEUE_SIZE_HAS_ARG, > I think I agree with Andrew that we could start with command line arguments I see, updated to remove the config file. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@35 PS7, Line 35: String maxWriterThreads, > This number feels really high to me, especially as a default. Is there any Hmm, I don't have any data to back it up yet. Maybe to start it with 3 as default for now? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@60 PS7, Line 60: > Nit: this (and associated fields) should be plural i.e. maxWriterThreads. Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@69 PS7, Line 69: > Nit: add /* fair= */ here. Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@79 PS7, Line 79: BufferedOutputStream out = new BufferedOutputStream(System.out)) { > Do we need to need to join on all of these futures? Via something like Comp Yeah, right, with try-with-resource, we need to add join on the futures. Updated. Thanks! http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/resources/log4j2.properties File java/kudu-subprocess/src/main/resources/log4j2.properties: PS7: > Every other log4j2.properties file also has this section: I think that is to specify the log level for org.apache.kudu package specifically. Since debug level seems too low as default so skip it here. Let me know if you think otherwise. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java File java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@33 PS7, Line 33: > Could you rename this to make it more clear that it's a test fixture? Maybe Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@38 PS7, Line 38: > Nit: for new files, try to use indicative tense ("Constructs a ...") vs. an Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@45 PS7, Line 45: > Nit: maybe 'create' or 'build' would be more precise? Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@46 PS7, Line 46: > I don't understand how this method is going to be extensible for other mess Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@68 PS7, Line 68: > Nit: maybe just serializeMessage? Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@78 PS7, Line 78: > Sort of expected this to be more symmetric with getSerializedMessage. Maybe Sorry for the confusion, updated it to be more symmetric and hope it is clear now. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@86 PS7, Line 86: > Nit: and deserializeMessage? Done http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java File java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java: PS7: > New tests should include the RetryRule so that they properly retry on failu Done -- To view, visit http://gerrit.cloudera.org:8080/14329 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Gerrit-Change-Number: 14329 Gerrit-PatchSet: 8 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Grant Henke <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Comment-Date: Mon, 13 Jan 2020 07:03:07 +0000 Gerrit-HasComments: Yes
