Adar Dembo has posted comments on this change. ( http://gerrit.cloudera.org:8080/14329 )
Change subject: [java] KUDU-2791: process communicates via protobuf-based protocol ...................................................................... Patch Set 7: (29 comments) http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@9 PS7, Line 9: other Should clarify here that because communication takes place over a stdin/stdout pipe, "other" really means whichever process spawns the Java process. http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@18 PS7, Line 18: 2) multiple writer threads that continuously retrieve the messages : from the queue, process them and write the responses to the standard : output. Curious why you didn't go with writing the messages to another blocking queue, and using a separate thread to then poll that queue and write its messages to stdout. The symmetry of such an approach with #1 could mean more reusable code. Something to consider: if stdout fills up (because the spawning process isn't consuming it fast enough), anyone who tries to write to it will block. With the current design, that means blocking the writing threads and request processing. With the change I'm proposing, only the single "responding" thread blocks and the other threads are free to continue processing requests and posting the responses to the second blocking queue. That is, instead of blocking all request processing, we'll just add more and more items to the queue. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@72 PS7, Line 72: if (in.available() <= 0) { : return ""; : } Why do we do this? Don't we want to block waiting for additional input if none exists? Looks like MessageReader will spin in a tight loop in this case anyway, which doesn't seem right. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@83 PS7, Line 83: "message size (" + size + ") exceeds maximum message size (" + : maxMessageBytes + ")" Nit: reformat using String.format http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@90 PS7, Line 90: "unable to receive message body"); Maybe include how much was actually read? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@91 PS7, Line 91: return new String(dataBytes, StandardCharsets.UTF_8); Why are we converting this into a UTF-8 String? It's a serialized PB message, right? Shouldn't it be a byte array? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@122 PS7, Line 122: String Why is this a String? Shouldn't it be a byte array? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@133 PS7, Line 133: ByteBuffer buffer = ByteBuffer.wrap(data) : .order(ByteOrder.BIG_ENDIAN); : return buffer.getInt(); Nit: could combine: return ByteBuffer.wrap(...).order(...).getInt(); http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@145 PS7, Line 145: ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES) : .order(ByteOrder.BIG_ENDIAN) : .putInt(value); : return buffer.array(); Nit: could combine: return ByteBuffer.allocate(...).order(...).putInt(...).array(); http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@44 PS7, Line 44: private static int RETRY_NUM = 3; Should be final too. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@60 PS7, Line 60: If IllegalStateException occur, : // which indicates the message is malformed, respond with an error message : // and continue to the next element on the queue. How is this safe? AFAICT, in most cases that readBytes() throws IllegalStateException, there's a good chance that it didn't fully process a message from the pipe, which would interfere with any future calls to readBytes. Nor could it process it: the two ends of the pipe aren't speaking the same language. I guess what I'm saying is this seems like it should be a fatal error. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@84 PS7, Line 84: // InterruptedException, record the interruption status and retry. Why do we retry if we've been interrupted? Isn't interruption a sign that the subprocess should exit? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98 PS7, Line 98: interrupted = true; IIUC, interruption means we retry the put() but then exit the main loop. Why do we bother with the put at all in this case? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@60 PS7, Line 60: public void run() { Same questions about interruption as for MessageReader. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@130 PS7, Line 130: * message write is atomic, retries on IOException until reaching the limits. I don't understand why we retry here at all, or what limits we're talking about. If stdout is full, attempts to write to it will block. And that's fine, isn't it? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java@64 PS7, Line 64: getRequestClazz Nit: the 'clazz' variant is only necessary for variable names, where 'class' is a reserved word. You can use 'class' inside a function name though; getRequestClass is perfectly legal. http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java: http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java@69 PS6, Line 69: String confFilePath = cmd.getOptionValue(CONF_LONG_OPTION); > I'm imagining how this would work end-to-end and using config files, I thin I agree with Andrew that the configuration for the _subprocess itself_ ought to be expressed as arguments rather than a file. Here's another argument: you already wrote some code to parse command line args just to extract the config file arg; why not reuse that code to parse additional args and drop the config file loading stuff? BTW, I expect that subprocess (and Ranger and Atlas and ...) configuration will be fully managed by the Kudu master. If not, we can't rev "both sides" and we suddenly have to worry about backwards compatibility across this interface. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@60 PS7, Line 60: getMaxWriterThread Nit: this (and associated fields) should be plural i.e. maxWriterThreads. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@69 PS7, Line 69: true Nit: add /* fair= */ here. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@79 PS7, Line 79: try (BufferedInputStream in = new BufferedInputStream(System.in); Do we need to need to join on all of these futures? Via something like CompletableFuture.allOf()? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/resources/log4j2.properties File java/kudu-subprocess/src/main/resources/log4j2.properties: PS7: Every other log4j2.properties file also has this section: logger.kudu.name = org.apache.kudu logger.kudu.level = debug What does it do? Should we add it here for consistency? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java File java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@33 PS7, Line 33: public class MessageTest { Could you rename this to make it more clear that it's a test fixture? Maybe "BaseMessageTest"? Also, could you add some docs? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@38 PS7, Line 38: * Construct a SubprocessRequestPB message based on the given request Nit: for new files, try to use indicative tense ("Constructs a ...") vs. an imperative tense ("Construct a ..."). For existing files it's best to just match the existing style. There's more on this in the Google Style Guide (https://google.github.io/styleguide/cppguide.html#Function_Comments). http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@45 PS7, Line 45: get Nit: maybe 'create' or 'build' would be more precise? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@46 PS7, Line 46: String payload, String messageType) { I don't understand how this method is going to be extensible for other message types. It's very likely that only the echo message type will have a single "payload" argument; other message types will no doubt be richer and accept more arguments. If extensibility isn't a goal, perhaps rename it to createEchoSubprocessRequest, drop the message type arg, and special-purpose it as a test-only method for echo messages? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@68 PS7, Line 68: getSerializedMessage Nit: maybe just serializeMessage? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@78 PS7, Line 78: * Read the given message in byte array from a output stream and returns Sort of expected this to be more symmetric with getSerializedMessage. Maybe I'll understand why they're not if I keep reading... http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@86 PS7, Line 86: <T extends Message> T getDeserializedMessage(byte[] bytes, Nit: and deserializeMessage? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java File java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java: PS7: New tests should include the RetryRule so that they properly retry on failure (if flaky) and report their test results. -- To view, visit http://gerrit.cloudera.org:8080/14329 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Gerrit-Change-Number: 14329 Gerrit-PatchSet: 7 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Grant Henke <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Comment-Date: Wed, 18 Dec 2019 19:32:08 +0000 Gerrit-HasComments: Yes
