Revision: 4905
http://sourceforge.net/p/vexi/code/4905
Author: mkpg2
Date: 2016-11-24 00:16:42 +0000 (Thu, 24 Nov 2016)
Log Message:
-----------
vexi.stream.pipe() cancellation support.
- 3rd arg 'listener' gets 'control' attribute set. Putting to the cancel
property on that stops the process.
Modified Paths:
--------------
branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java
Modified:
branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
===================================================================
--- branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
2016-11-23 21:33:18 UTC (rev 4904)
+++ branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
2016-11-24 00:16:42 UTC (rev 4905)
@@ -15,6 +15,7 @@
import java.io.OutputStream;
import java.lang.ref.WeakReference;
import java.net.URL;
+import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -30,6 +31,7 @@
import org.ibex.util.Cache;
import org.ibex.util.Callable;
import org.ibex.util.Callback;
+import org.ibex.util.DefaultLog;
import org.ibex.util.IOUtil;
import org.ibex.util.Logger;
@@ -377,7 +379,19 @@
Long contentLength = info.length;
OutputStream os =f1.getOutputStream(true,
contentLength);
if(listener!=null){
+ final java.lang.Thread thread =
java.lang.Thread.currentThread();
+ listener.put(JSU.S("control"), new
JS.Immutable(){
+ public void put(JS keyJS, JS
val) throws JSExn {
+ String key =
JSU.toString(keyJS);
+
if("cancel".equals(key)){
+
thread.interrupt();
+ return;
+ }
+ super.put(keyJS, val);
+ }
+ });
+
long length =
contentLength==null?819200:contentLength;
final int callbackRate =
(int)Math.max(8192,length/100);
@@ -422,7 +436,7 @@
bufferSize =
(int)(1*contentLength);
}else{
// just increase buffer a bit
for bigger requests
- for(int i=0; i<4 &&
contentLength>8*bufferSize; i++){
+ for(int i=0; i<6 &&
contentLength>8*bufferSize; i++){
bufferSize *=2;
}
}
@@ -430,9 +444,23 @@
byte[] buffer = new byte[bufferSize];
IOUtil.pipe(f0.getInputStream(true), os,
buffer);
+
return null;
+ } catch (ClosedByInterruptException e){
+ // ok we asked for this, nothing to do (except
finally)
+ return null;
} catch (IOException e) {
+ if(java.lang.Thread.interrupted()){
+
java.lang.Thread.currentThread().interrupt();
+ DefaultLog.logger.info(Fountain.class,
"Interupted IO complained: "+e.getMessage());
+ return null;
+ }
+ java.lang.Thread.currentThread().interrupt();
throw JSU.handleFountainExn(e);
+ } finally{
+ if(java.lang.Thread.interrupted()){
+
sched.schedulePutAndTriggerTraps(listener, JSU.S("cancel"), JSU.T);
+ }
}
}
});
Modified: trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java
===================================================================
--- trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java
2016-11-23 21:33:18 UTC (rev 4904)
+++ trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java
2016-11-24 00:16:42 UTC (rev 4905)
@@ -48,16 +48,23 @@
out.close();
}
}
+
- public static void pipe(InputStream in, OutputStream out, byte[]
workspace) throws IOException {
+ static public void pipe(InputStream in, OutputStream out, byte[]
workspace) throws IOException {
try{
while (true) {
+ if(Thread.interrupted()){
+ Thread.currentThread().interrupt();
+ in.close();
+ break;
+ }
+
int bytesRead = in.read(workspace);
if (bytesRead == -1) break;
out.write(workspace, 0, bytesRead);
}
}finally{
- out.close();
+ out.close();
}
}
}
This was sent by the SourceForge.net collaborative development platform, the
world's largest Open Source development site.
------------------------------------------------------------------------------
_______________________________________________
Vexi-svn mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/vexi-svn