Hao Hao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14329 )

Change subject: [java] KUDU-2971: process communicates via protobuf-based 
protocol
......................................................................


Patch Set 8:

(37 comments)

http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@7
PS7, Line 7: KUDU-2971
> KUDU-2971
Done


http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@9
PS7, Line 9: a std
> Should clarify here that because communication takes place over a stdin/std
Done


http://gerrit.cloudera.org:8080/#/c/14329/7//COMMIT_MSG@18
PS7, Line 18:     from the queue, process them and write the responses to the 
standard
            :     output.
            :
> Curious why you didn't go with writing the messages to another blocking que
Makes sense. I am thinking to address it in a follow up patch as this one is 
big enough?


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/BasicSubprocessor.java@96
PS6, Line 96:
            :
            :
> Ah, I was asking how this interacts, if at all, with ERROR_HANDLER, but bas
Ah, that is not only for output stream, updated. Thanks!


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@44
PS6, Line 44:
            :   MessageProcessor(long maxMessageBytes,
            :                    BufferedInputStream in,
            :                    BufferedOutputStream out) {
            :     this.maxMessageBytes = maxMessageBytes;
            :     this.in = in;
            :     this.out = out;
            :   }
            :
            :   /**
            :    * Read a protobuf message, if any, from the underlying 
buffered input
            :    * stream. The read is not atomic (partial read can happen if 
any
            :    * exceptions occur) and blocking (waits until the input is 
available).
            :    *
            :    * @return the message in a byte array.
            :    * @throws EOFException if the end of the stream has been 
reached
            :    * @throws IOException if this input stream has been closed, an 
I/O
            :
> I just requested constructors on the latest patch. I think one constructor
Done


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@92
PS6, Line 92:   private void doRead(byte bytes[], int size) throws EOFExcept
> IllegalStateException is a RuntimeException which is an "Unchecked" excepti
Ack


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@49
PS7, Line 49:     this.in = in;
> Can this be in the constructor instead? Mutable state like this can get tri
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@72
PS7, Line 72:           String.format("message size (%d) exceeds maximum 
message size (%d)",
            :                         size, maxMessageBytes));
            :     }
> Why do we do this? Don't we want to block waiting for additional input if n
Makes sense, updated.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@83
PS7, Line 83:    * input stream into the specified byte array, starting at the 
offset
            :    * <code>0</code>. If fail to r
> Nit: reformat using String.format
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@90
PS7, Line 90:    *                     error occurs, or fail to read the 
specified size
> Maybe include how much was actually read?
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@91
PS7, Line 91:    */
> Why are we converting this into a UTF-8 String? It's a serialized PB messag
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@122
PS7, Line 122:
> Why is this a String? Shouldn't it be a byte array?
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@133
PS7, Line 133:   }
             :
             :   /**
> Nit: could combine:
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@145
PS7, Line 145:
             :   /**
             :    * Convert a 32-bit integer to a four bytes array in big 
endian order.
             :    * @param value a 32-bit
> Nit: could combine:
Done


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java:

http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98
PS6, Line 98: }
> I'm asking whether that's what we're doing. 'finally' blocks are run uncond
yeah, this still interrupts the caller if it cares about InterruptedException. 
This is to follow regular guideline of how to handling InterruptedException. 
However, in this case the caller(CompletableFuture.runAsync in 
Subprocessor.java) does not check for the interrupt flag based on my 
understanding. So it doesn't matter much to propagate or not.


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98
PS6, Line 98: }
> Is there a test case around this retry/finally handling that verifies/show
Added one.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@44
PS7, Line 44:   private BlockingQueue<byte[]> blockingQueue;
> Should be final too.
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@60
PS7, Line 60:
            :       while (!Thread.currentThread().isInterrupted()) {
            :         // Read the message from the standard input. If f
> How is this safe? AFAICT, in most cases that readBytes() throws IllegalStat
Makes sense, updated.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@84
PS7, Line 84:         // InterruptedException during the put, record the 
interruption
> Why do we retry if we've been interrupted? Isn't interruption a sign that t
In this particular case, I don't think we would want to cancel the read/write 
task and exit the subprocess program due to InterruptedException thrown from 
the blocking queue (e.g cases like waiting on put the message to the queue and 
get interrupted). Therefore I chose to retry. But maybe you see there is a 
strong reason we should exit in such cases?


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98
PS7, Line 98:             }
> IIUC, interruption means we retry the put() but then exit the main loop. Wh
Hmm, not quite following your question, here we retry put() for certain times 
and continue (does not exit the main loop) either the retry succeeds or fails. 
And the retry is to try not lose a received message if possible.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@60
PS7, Line 60:     boolean interrupted = false;
> Same questions about interruption as for MessageReader.
Ack


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@130
PS7, Line 130:    * and should be propagated to the code higher up on the call 
stack.
> I don't understand why we retry here at all, or what limits we're talking a
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java@64
PS7, Line 64:
> Nit: the 'clazz' variant is only necessary for variable names, where 'class
Done


http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java:

http://gerrit.cloudera.org:8080/#/c/14329/6/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessCommandLineParser.java@69
PS6, Line 69:                                      QUEUE_SIZE_HAS_ARG,
> I think I agree with Andrew that we could start with command line arguments
I see, updated to remove the config file.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@35
PS7, Line 35:                                  String maxWriterThreads,
> This number feels really high to me, especially as a default. Is there any
Hmm, I don't have any data to back it up yet. Maybe to start it with 3 as 
default for now?


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@60
PS7, Line 60:
> Nit: this (and associated fields) should be plural i.e. maxWriterThreads.
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@69
PS7, Line 69:
> Nit: add /* fair= */ here.
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/Subprocessor.java@79
PS7, Line 79:          BufferedOutputStream out = new 
BufferedOutputStream(System.out)) {
> Do we need to need to join on all of these futures? Via something like Comp
Yeah, right, with try-with-resource,  we need to add join on the futures. 
Updated. Thanks!


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/resources/log4j2.properties
File java/kudu-subprocess/src/main/resources/log4j2.properties:

PS7:
> Every other log4j2.properties file also has this section:
I think that is to specify the log level for org.apache.kudu package 
specifically. Since debug level seems too low as default so skip it here. Let 
me know if you think otherwise.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java
File 
java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@33
PS7, Line 33:
> Could you rename this to make it more clear that it's a test fixture? Maybe
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@38
PS7, Line 38:
> Nit: for new files, try to use indicative tense ("Constructs a ...") vs. an
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@45
PS7, Line 45:
> Nit: maybe 'create' or 'build' would be more precise?
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@46
PS7, Line 46:
> I don't understand how this method is going to be extensible for other mess
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@68
PS7, Line 68:
> Nit: maybe just serializeMessage?
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@78
PS7, Line 78:
> Sort of expected this to be more symmetric with getSerializedMessage. Maybe
Sorry for the confusion, updated it to be more symmetric and hope it is clear 
now.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@86
PS7, Line 86:
> Nit: and deserializeMessage?
Done


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java
File 
java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java:

PS7:
> New tests should include the RetryRule so that they properly retry on failu
Done



--
To view, visit http://gerrit.cloudera.org:8080/14329
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982
Gerrit-Change-Number: 14329
Gerrit-PatchSet: 8
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Grant Henke <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Comment-Date: Mon, 13 Jan 2020 07:03:07 +0000
Gerrit-HasComments: Yes

Reply via email to