Attila Bukor has posted comments on this change. ( http://gerrit.cloudera.org:8080/14329 )
Change subject: [java] KUDU-2971: process communicates via protobuf-based protocol ...................................................................... Patch Set 8: (4 comments) http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageProcessor.java@72 PS7, Line 72: String.format("message size (%d) exceeds maximum message size (%d)", : size, maxMessageBytes)); : } > Makes sense, updated. As InputStream doesn't throw InterruptedException when it's interrupted (it will just block continuously), I would suggest checking availability and if there's nothing on the stream, sleep a bit and check again. The problem with this approach is that that if we choose a low value, we'll be busy waiting, but if it's too large, then we introduce latency. Maybe make the sleep configurable? What do you think? Another (hacky) approach would be to send SIGINT then write a GOODBYE message to the stream to unblock? http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@84 PS7, Line 84: // InterruptedException during the put, record the interruption > My understanding of InterruptedException is to provide the blocking method If the thread is interrupted, we should terminate. If the blocking queue is full, there's no guarantee it will ever be consumed and that the put can succeed and we still want to terminate. http://gerrit.cloudera.org:8080/#/c/14329/7/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@98 PS7, Line 98: } > Hmm, no, Thread.currentThread().isInterrupted() is actually checking the in we still only interrupt the thread in finally which is basically never reached unless we receive an unchecked exception or when we throw KuduSubprocessException. We should simply Thread.currentThread().interrupt() here and remove the finally block. http://gerrit.cloudera.org:8080/#/c/14329/8/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java File java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java: http://gerrit.cloudera.org:8080/#/c/14329/8/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java@79 PS8, Line 79: if (injectInterrupt) { > Here we want to test InterruptionException thrown for blockingQueue.put(dat I'm not sure I understand, the thread would be blocking on either messageProcessor.readBytes() or blockingQueue.put() anyway. If you use a mock blocking queue or a full real blocking queue in the test and you have something to read in the input stream, then interrupt the thread, blockingQueue will throw InterruptedException. -- To view, visit http://gerrit.cloudera.org:8080/14329 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Iaf9ad24dbc9acc681284b6433836271b5b4c7982 Gerrit-Change-Number: 14329 Gerrit-PatchSet: 8 Gerrit-Owner: Hao Hao <[email protected]> Gerrit-Reviewer: Adar Dembo <[email protected]> Gerrit-Reviewer: Andrew Wong <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Grant Henke <[email protected]> Gerrit-Reviewer: Hao Hao <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Comment-Date: Tue, 14 Jan 2020 17:55:42 +0000 Gerrit-HasComments: Yes
