Attila Bukor has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/14329 )

Change subject: [java] KUDU-2971: process communicates via protobuf-based 
protocol
......................................................................


Patch Set 8:

(4 comments)

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@72
PS7, Line 72:           String.format("message size (%d) exceeds maximum 
message size (%d)",
            :                         size, maxMessageBytes));
            :     }
> Makes sense, updated.
As InputStream doesn't throw InterruptedException when it's interrupted (it 
will just block continuously), I would suggest checking availability and if 
there's nothing on the stream, sleep a bit and check again. The problem with 
this approach is that that if we choose a low value, we'll be busy waiting, but 
if it's too large, then we introduce latency. Maybe make the sleep 
configurable? What do you think?

Another (hacky) approach would be to send SIGINT then write a GOODBYE message 
to the stream to unblock?


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java:

http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@84
PS7, Line 84:         // InterruptedException during the put, record the 
interruption
> My understanding of InterruptedException is to provide the blocking method
If the thread is interrupted, we should terminate. If the blocking queue is 
full, there's no guarantee it will ever be consumed and that the put can 
succeed and we still want to terminate.


http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98
PS7, Line 98:             }
> Hmm, no, Thread.currentThread().isInterrupted() is actually checking the in
we still only interrupt the thread in finally which is basically never reached 
unless we receive an unchecked exception or when we throw 
KuduSubprocessException. We should simply Thread.currentThread().interrupt() 
here and remove the finally block.


http://gerrit.cloudera.org:8080/#/c/14329/8/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
File 
java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java:

http://gerrit.cloudera.org:8080/#/c/14329/8/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@79
PS8, Line 79:         if (injectInterrupt) {
> Here we want to test InterruptionException thrown for blockingQueue.put(dat
I'm not sure I understand, the thread would be blocking on either 
messageProcessor.readBytes() or blockingQueue.put() anyway. If you use a mock 
blocking queue or a full real blocking queue in the test and you have something 
to read in the input stream, then interrupt the thread, blockingQueue will 
throw InterruptedException.



--
To view, visit http://gerrit.cloudera.org:8080/14329
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982
Gerrit-Change-Number: 14329
Gerrit-PatchSet: 8
Gerrit-Owner: Hao Hao <[email protected]>
Gerrit-Reviewer: Adar Dembo <[email protected]>
Gerrit-Reviewer: Andrew Wong <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Grant Henke <[email protected]>
Gerrit-Reviewer: Hao Hao <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Comment-Date: Tue, 14 Jan 2020 17:55:42 +0000
Gerrit-HasComments: Yes

Reply via email to