Attila Bukor has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14329 )

Change subject: [java] KUDU-2791: process communicates via protobuf-based 
protocol
......................................................................


Patch Set 4:

(15 comments)

haven't finished reviewing yet, but I'm leaving a few comments.

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@47
PS4, Line 47:   private AtomicInteger retries;
> Could this be local to each call to run()? If so, could it be non-atomic?
yeah it should be local non-atomic so we retry 3 times for each message instead 
of 3 times total - and then not do anything for the remaining lifetime of the 
process


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@67
PS4, Line 67:   public void run() {
we should probably split this method into 3 methods: take from queue, process 
message and build response, then write to the output.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@69
PS4, Line 69:     try {
you can use try with resources here. If you do `try (BufferedOutputStream os = 
out)` it will automatically close the stream and you don't need to handle it in 
finally. Coupled with my suggestion in L64 you don't even need the finally 
block.

Maybe we shouldn't even close the streams here, but do a try with resources 
where we instantiate the consumers and producers as the streams could be shared 
between threads in which case we shouldn't close them.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@70
PS4, Line 70: true
this should be `while (!Thread.currentThread().isInterrupted())` instead. This 
doesn't change the interrupted flag either so there's no need for a separate 
interrupted flag in this scope.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@77
PS4, Line 77:         } catch (InterruptedException e) {
We shouldn't catch this here, we don't want to retry anything if the thread is 
interrupted. The point is to gracefully interrupt the process. In this case 
what we should is jump to a catch of the outer try block and log that we were 
interrupted and let the process exit.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@94
PS4, Line 94:           response = 
respondWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, responseBuilder);
I think we shouldn't respond here as we don't have a sequence ID so the server 
can't connect it back to a request anyway. We should retry after a timeout on 
the server side instead.


http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@160
PS1, Line 160:
> The given BufferedInputStream (or BufferedOutputStream) can be different ea
Wouldn't it be still better to instantiate a separate MessageProcessor for each 
IO stream?


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@75
PS4, Line 75:       return data;
> nit: don't need the local variable -- just:
or just `return "";`


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@148
PS4, Line 148:   private void writeBinaryMessage(BufferedOutputStream out,
this should be synchronized. Alternatively (probably a better approach) would 
be if we had two queues - one for input and one for output.

We could have a MessageReader that reads from the InputStream and puts the 
messages on the input queue, the MessageProcessor would process the messages 
and put the response to the output queue and an MessageWriter would take the 
message from the output queue and write to the output stream. This way we can 
have a thread pool of MessageProcessors doing the actual work and a single 
MessageReader and MessageWriter that would take care of IO on the pipe.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@39
PS4, Line 39:  should not be called concurrently unless handled by
            :  * the caller.
> I'm finding this a bit confusing: under what scenario can run() be called c
you could instantiate a bunch of MessageProducers in a thread pool.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@70
PS4, Line 70:     try {
same as MessageConsumer


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@71
PS4, Line 71:       while (true) {
same as MessageConsumer


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@88
PS4, Line 88:             messageProcessor.writeMessage(out, response);
same as MessageConsumer, we don't have a sequence ID so we should just log the 
exception and drop the request.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@100
PS4, Line 100:         while (retries.getAndDecrement() > 0) {
same as MessageConsumer


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@106
PS4, Line 106:             interrupted = true;
we shouldn't retry after an interruption, as in this case the expected behavior 
is to gracefully interrupt. If there's an InterruptedException thrown by 
blockingQueue.put(), we should do `Thread.currentThread().interrupt()` and then 
break out the retry loop.



--
To view, visit http://gerrit.cloudera.org:8080/14329
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982
Gerrit-Change-Number: 14329
Gerrit-PatchSet: 4
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Grant Henke <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Comment-Date: Tue, 19 Nov 2019 16:04:09 +0000
Gerrit-HasComments: Yes

Reply via email to