Hao Hao has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14329 )

Change subject: [java] KUDU-2791: process communicates via protobuf-based 
protocol
......................................................................


Patch Set 5:

(36 comments)

http://gerrit.cloudera.org:8080/#/c/14329/4//COMMIT_MSG
Commit Message:

PS4:
> Could you describe the expectations w.r.t error handling? What kind of "err
Done


http://gerrit.cloudera.org:8080/#/c/14329/4//COMMIT_MSG@22
PS4, Line 22:  reading/writing to the pipe
> Seems you also have do update the SubprocessConfiguration
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/EchoProtocolProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/EchoProtocolProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/EchoProtocolProcessor.java@29
PS4, Line 29: @InterfaceAudience.Private
> I like the use of generics here. Could we extend it even further so that we
I don't quite understand what will your proposal look like. Do you mean to have 
different java programs for each protocol type and in C++ side will run the 
subprocess based on a configuration? In this case, we need to keep multiple 
jars?


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/EchoProtocolProcessor.java@32
PS4, Line 32:   private static final String ECHO_MESSAGE = "
> nit: final?
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@35
PS4, Line 35:
            :
            :
            :
            :
> nit: maybe note the concurrency expectations of this method, e.g. that we c
Removed BufferedOutputStream in this class.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@47
PS4, Line 47:
> yeah it should be local non-atomic so we retry 3 times for each message ins
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@67
PS4, Line 67:
> we should probably split this method into 3 methods: take from queue, proce
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@69
PS4, Line 69:
> you can use try with resources here. If you do `try (BufferedOutputStream o
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@70
PS4, Line 70:
> this should be `while (!Thread.currentThread().isInterrupted())` instead. T
As mentioned below, we would want to retry with InterruptedException for taking 
elements from the blocking queue. I don't think `while 
(!Thread.currentThread().isInterrupted())`  is appropriate.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@77
PS4, Line 77:
> We shouldn't catch this here, we don't want to retry anything if the thread
We are taking element from a blocking queue, so we would want to retry when 
interruption is detected (there is no places in the code to intentionally 
interrupt the thread), rather than immediately catch and return.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@94
PS4, Line 94:
> I think we shouldn't respond here as we don't have a sequence ID so the ser
Message without sequence ID should be handled in the server(C++ side), the 
original request will time out as you suggested. I think it is good to at least 
returned a error message back to the server to indicate something is wrong.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageConsumer.java@115
PS4, Line 115:
             :
             :
             :
             :
> This never gets reset to false, even if we were eventually table to take fr
Hmm, interrupted is initiated as false and only set to true of an 
InterruptedException is caught. As mentioned in 'Java Concurrency in Practice' 
Chapter 7 (p143). It is a good practice to 'Restore the interruption status so 
that code higher up on the call stack can deal with it'.


http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/1/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@160
PS1, Line 160:
> Wouldn't it be still better to instantiate a separate MessageProcessor for
Yeah, I agree after thinking a bit more. Updated it.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@39
PS4, Line 39: class MessageProcessor {
> Why does this need to be a class? Couldn't it be a set of utility functions
I decided to wrap inputstream and outputstream as MessageProcessor class member 
as you suggested previously. So keep it as a class.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@40
PS4, Line 40:
            :   private long maxMessageBytes;
            :   private BufferedInputStream in;
            :   private BufferedOutputStream out;
            :
            :   MessageProcessor(long maxMessageBytes) {
            :     this.maxMessageBytes = maxMessageBytes;
            :   }
> Remove this?
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@50
PS4, Line 50:                    BufferedInputStream
> nit: probably don't need to define a constant for this
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@75
PS4, Line 75:
> or just `return "";`
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@93
PS4, Line 93:     // Read four bytes of the message to get the size of the body.
            :     byte[] sizeBytes = new byte[Integer.BYTES];
            :     int read = in.read(sizeBytes, 0, Integer.BYTES);
            :
> Why have this when writeBinaryMessage already exists?
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@113
PS4, Line 113:    * stream. The write is atomic, that is if any exceptions 
occur, no
             :    * partial message should be writte
> nit: maybe note that this isn't threadsafe and that we expect there to be a
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@126
PS4, Line 126:     // Always do a flush after write to ensure no partial 
message is written.
             :     out.flush();
             :   }
             :
             :   /**
> Can we also include 'read' and 'size' in the error messages to facilitate d
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@148
PS4, Line 148:    */
> this should be synchronized. Alternatively (probably a better approach) wou
The write is synchronized in BufferedOutputStream.

I don't see a must have reason to have two blocking queues. I don't see much 
performance implication with current approach as we are using pipe(which should 
be fast). But let me know if you think otherwise. Thanks!


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@39
PS4, Line 39:
            :
> you could instantiate a bunch of MessageProducers in a thread pool.
Yeah, as what Attila suggested.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@70
PS4, Line 70:
> same as MessageConsumer
Ack


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@71
PS4, Line 71:
> same as MessageConsumer
Ack


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@88
PS4, Line 88:
> same as MessageConsumer, we don't have a sequence ID so we should just log
Ack


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@98
PS4, Line 98:
> Why aren't we responding with an error here?
I added a warning as I don't see it is necessary an error (the message can be 
parsed). But let me know if you think otherwise.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@100
PS4, Line 100:
> same as MessageConsumer
Ack


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProducer.java@106
PS4, Line 106:
> we shouldn't retry after an interruption, as in this case the expected beha
Ack


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java@37
PS4, Line 37: SubprocessResponseP
> SubprocessResponsePB
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolProcessor.java@64
PS4, Line 64:   abstract Class<Request> getRequestClazz();
> Will this always be:
I don't think you can create an instance of a generic type, so 'return 
Request.class;' is not possible.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java@95
PS4, Line 95: }
> I think this should be long just in case. In the C++ side we use uint but a
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@63
PS4, Line 63:     ExecutorService producerService = 
Executors.newSingleThreadExecutor();
> maybe we can use try with resources here and then join the threads at the e
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@80
PS4, Line 80:
> there should be a separate consumer instance for each thread.
I don't see why it is necessary to have separate consumer instance for each 
thread?


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMain.java@88
PS4, Line 88:         consumerFuture.exceptionally(errorHandler);
> don't think these are needed, shutdown only means the executor service won'
I agree this is not needed, and as in each CompletableFuture has error handling 
for any exceptions. I don't think shutdownNow() is needed as well. Let me if 
you think I miss anything.


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java
File 
java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java@42
PS4, Line 42: @RunWith(Parameterized.class)
            : public class TestMessageProcessor e
> There's only one serialization mode now.
Done


http://gerrit.cloudera.org:8080/#/c/14329/4/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageProcessor.java@78
PS4, Line 78:
> this should be in its own class, but as Andrew said, we probably don't need
Done



--
To view, visit http://gerrit.cloudera.org:8080/14329
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982
Gerrit-Change-Number: 14329
Gerrit-PatchSet: 5
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Grant Henke <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Comment-Date: Mon, 09 Dec 2019 17:40:39 +0000
Gerrit-HasComments: Yes

Reply via email to