Hao Hao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14329 )

Change subject: [java] KUDU-2791: process communicates via protobuf-based 
protocol
......................................................................


Patch Set 5:

(24 comments)

http://gerrit.cloudera.org:8080/#/c/14329/5//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/14329/5//COMMIT_MSG@22
PS5, Line 22: IOExcpetion
> nit: IOException
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@70
PS4, Line 70:
> hm... there's no way to gracefully shut down in this case though. If we don
As mentioned earlier, it is advised to 'Restore the interruption status so that 
code higher up on the call stack can deal with it' in Java Concurrency in 
Practice' Chapter 7 (p143).  Even though we don't consider InterruptedException 
from the BlockingQueue fatal, we still want the caller to know about it when 
fatal exception happen.

However, as you suggested to register a shutdown hook for orderly shutdown, we 
need to handle InterruptedException from ExecturorService.shutdownNow(), so 
updated it with your suggestion.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@148
PS4, Line 148:    */
> even if BufferedOutputStream synchronizes, write and flush, I think it shou
I updated as you suggested to synchronized on out.


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@77
PS5, Line 77: binary mode
> nit: "binary mode" doesn't exist in the context of the java subprocess anym
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@85
PS5, Line 85:    * @throws IllegalArgumentException if the serialization mode 
is invalid
            :    * @throws IllegalStateException if the message is malformed
> nit: remove this
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@88
PS5, Line 88: readMessage
> nit: how about calling this readBytes() or somesuch?
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@88
PS5, Line 88:   String readMessage() throws IOException {
> nit: Maybe we should add preconditions here that 'in' is set.
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@89
PS5, Line 89:     if (in.available() <= 0) {
> can this be negative? anyway, we should return "" instead of new String() s
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@117
PS5, Line 117:    * @throws IllegalArgumentException if the serialization mode 
is invalid
             :    * @throws InvalidProtocolBufferException if there are any 
unknown types
             :    *                                        in the message
> nit: remove this
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@122
PS5, Line 122:   void writeMessage(Message message) throws IOException {
> nit: Maybe we should add preconditions here that 'out' is set.
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@125
PS5, Line 125:  out.write(Bytes.concat(size, body));
             :     // Always do a flush after write to ensure no partial 
message is written.
             :     out.flush();
> these two statements should be synchronized on out.
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@131
PS5, Line 131: protobuf message with the given parser and builder.
> nit: we're not actually parsing a protobuf message -- we're parsing some by
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java:

http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@33
PS5, Line 33: read
> nit: reads
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@60
PS5, Line 60: true
> this should be while (!Thread.currentThread().isInterrupted()). Right now t
Updated as calling ExecutorService.shutdownNow() may interrupt the thread.


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@94
PS5, Line 94:             LOG.debug("Message: {} has been put on the queue", 
data);
> this should be in an if (LOG.isDebugEnabled()) block so we don't format a s
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@96
PS5, Line 96:           } catch (InterruptedException e) {
> why are we retrying when the thread is interrupted? Shouldn't we simply int
I don't see it is necessary to cancel the activity that puts the message in the 
queue if it get blocked and interrupted immediately, it may success with 
following retry.  Moreover, even if 'blockingQueue.put' fail after several 
retries, we don't want a single message process failure to cause the program to 
fail. Or I am missing anything?


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java:

http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@37
PS5, Line 37: write
> nit: writes
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@63
PS5, Line 63:       while (true) {
> this should be while (!Thread.currentThread().isInterrupted()) as well, sim
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java@69
PS5, Line 69:           LOG.debug("Message: {} has been taken from the queue", 
data);
> if (LOG.isDebugEnabled())
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@88
PS4, Line 88:         consumerFuture.exceptionally(errorHandler);
> yeah we can skip shutdowns here, but maybe add them in a shutdown hook?
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java:

PS5:
> we should register a shutdown hook that shuts down the threadpools (shutdow
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@67
PS5, Line 67:     Function<Throwable, Object> errorHandler = (t) -> {
            :       System.exit(1);
            :       return null;
            :     };
> nit: Could this be static?
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@78
PS5, Line 78: producer
> nit: we've shifted over to calling these 'readers' and 'writers'. Could you
Done


http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java
File 
java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java:

http://gerrit.cloudera.org:8080/#/c/14329/5/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTest.java@69
PS5, Line 69: MessageProcessor messageProcessor = new MessageProcessor(
            :         SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT);
> Do we have to have a different constructor for this? Couldn't we just pass
Done



--
To view, visit http://gerrit.cloudera.org:8080/14329
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982
Gerrit-Change-Number: 14329
Gerrit-PatchSet: 5
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Grant Henke <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Comment-Date: Mon, 16 Dec 2019 07:15:27 +0000
Gerrit-HasComments: Yes

Reply via email to