Revision: 4905 http://sourceforge.net/p/vexi/code/4905 Author: mkpg2 Date: 2016-11-24 00:16:42 +0000 (Thu, 24 Nov 2016) Log Message: ----------- vexi.stream.pipe() cancellation support. - 3rd arg 'listener' gets 'control' attribute set. Putting to the cancel property on that stops the process.
Modified Paths: -------------- branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java Modified: branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java =================================================================== --- branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java 2016-11-23 21:33:18 UTC (rev 4904) +++ branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java 2016-11-24 00:16:42 UTC (rev 4905) @@ -15,6 +15,7 @@ import java.io.OutputStream; import java.lang.ref.WeakReference; import java.net.URL; +import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -30,6 +31,7 @@ import org.ibex.util.Cache; import org.ibex.util.Callable; import org.ibex.util.Callback; +import org.ibex.util.DefaultLog; import org.ibex.util.IOUtil; import org.ibex.util.Logger; @@ -377,7 +379,19 @@ Long contentLength = info.length; OutputStream os =f1.getOutputStream(true, contentLength); if(listener!=null){ + final java.lang.Thread thread = java.lang.Thread.currentThread(); + listener.put(JSU.S("control"), new JS.Immutable(){ + public void put(JS keyJS, JS val) throws JSExn { + String key = JSU.toString(keyJS); + if("cancel".equals(key)){ + thread.interrupt(); + return; + } + super.put(keyJS, val); + } + }); + long length = contentLength==null?819200:contentLength; final int callbackRate = (int)Math.max(8192,length/100); @@ -422,7 +436,7 @@ bufferSize = (int)(1*contentLength); }else{ // just increase buffer a bit for bigger requests - for(int i=0; i<4 && contentLength>8*bufferSize; i++){ + for(int i=0; i<6 && contentLength>8*bufferSize; i++){ bufferSize *=2; } } @@ -430,9 +444,23 @@ byte[] buffer = new byte[bufferSize]; IOUtil.pipe(f0.getInputStream(true), os, buffer); + return null; + } catch (ClosedByInterruptException e){ + // ok we asked for this, nothing to do (except finally) + return null; } catch (IOException e) { + if(java.lang.Thread.interrupted()){ + java.lang.Thread.currentThread().interrupt(); + DefaultLog.logger.info(Fountain.class, "Interupted IO complained: "+e.getMessage()); + return null; + } + java.lang.Thread.currentThread().interrupt(); throw JSU.handleFountainExn(e); + } finally{ + if(java.lang.Thread.interrupted()){ + sched.schedulePutAndTriggerTraps(listener, JSU.S("cancel"), JSU.T); + } } } }); Modified: trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java =================================================================== --- trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java 2016-11-23 21:33:18 UTC (rev 4904) +++ trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java 2016-11-24 00:16:42 UTC (rev 4905) @@ -48,16 +48,23 @@ out.close(); } } + - public static void pipe(InputStream in, OutputStream out, byte[] workspace) throws IOException { + static public void pipe(InputStream in, OutputStream out, byte[] workspace) throws IOException { try{ while (true) { + if(Thread.interrupted()){ + Thread.currentThread().interrupt(); + in.close(); + break; + } + int bytesRead = in.read(workspace); if (bytesRead == -1) break; out.write(workspace, 0, bytesRead); } }finally{ - out.close(); + out.close(); } } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. ------------------------------------------------------------------------------ _______________________________________________ Vexi-svn mailing list Vexi-svn@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/vexi-svn