Revision: 4905
          http://sourceforge.net/p/vexi/code/4905
Author:   mkpg2
Date:     2016-11-24 00:16:42 +0000 (Thu, 24 Nov 2016)
Log Message:
-----------
vexi.stream.pipe() cancellation support.
- 3rd arg 'listener' gets 'control' attribute set. Putting to the cancel 
property on that stops the process.

Modified Paths:
--------------
    branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
    trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java

Modified: 
branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java
===================================================================
--- branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java  
2016-11-23 21:33:18 UTC (rev 4904)
+++ branches/vexi3/org.vexi-library.js/src/main/java/org/ibex/js/Fountain.java  
2016-11-24 00:16:42 UTC (rev 4905)
@@ -15,6 +15,7 @@
 import java.io.OutputStream;
 import java.lang.ref.WeakReference;
 import java.net.URL;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -30,6 +31,7 @@
 import org.ibex.util.Cache;
 import org.ibex.util.Callable;
 import org.ibex.util.Callback;
+import org.ibex.util.DefaultLog;
 import org.ibex.util.IOUtil;
 import org.ibex.util.Logger;
 
@@ -377,7 +379,19 @@
                                Long contentLength = info.length;
                                OutputStream os =f1.getOutputStream(true, 
contentLength);
                                if(listener!=null){
+                                       final java.lang.Thread thread = 
java.lang.Thread.currentThread();
+                                       listener.put(JSU.S("control"), new 
JS.Immutable(){
+                                               public void put(JS keyJS, JS 
val) throws JSExn {
+                                                       String key = 
JSU.toString(keyJS);
+                                                       
if("cancel".equals(key)){
+                                                               
thread.interrupt();
+                                                               return;
+                                                       }
+                                                       super.put(keyJS, val);
+                                               }
+                                       });
                                        
+                                       
                                long length = 
contentLength==null?819200:contentLength;
                                final int callbackRate = 
(int)Math.max(8192,length/100);
                                
@@ -422,7 +436,7 @@
                                                bufferSize = 
(int)(1*contentLength);
                                        }else{
                                                // just increase buffer a bit 
for bigger requests
-                                               for(int i=0; i<4 && 
contentLength>8*bufferSize; i++){
+                                               for(int i=0; i<6 && 
contentLength>8*bufferSize; i++){
                                                        bufferSize *=2;
                                                }
                                        }
@@ -430,9 +444,23 @@
                                
                                byte[] buffer = new byte[bufferSize];
                                IOUtil.pipe(f0.getInputStream(true), os, 
buffer);
+                               
                                        return null;
+                       } catch (ClosedByInterruptException e){
+                               // ok we asked for this, nothing to do (except 
finally)
+                               return null;
                        } catch (IOException e) {
+                               if(java.lang.Thread.interrupted()){
+                                       
java.lang.Thread.currentThread().interrupt();
+                                       DefaultLog.logger.info(Fountain.class, 
"Interupted IO complained: "+e.getMessage());
+                                       return null;
+                               }
+                               java.lang.Thread.currentThread().interrupt();
                                throw JSU.handleFountainExn(e);
+                               } finally{
+                                       if(java.lang.Thread.interrupted()){
+                                               
sched.schedulePutAndTriggerTraps(listener, JSU.S("cancel"), JSU.T);
+                                       }
                                }
                        }
                });     

Modified: trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java
===================================================================
--- trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java 
2016-11-23 21:33:18 UTC (rev 4904)
+++ trunk/org.vexi-library.util/src/main/java/org/ibex/util/IOUtil.java 
2016-11-24 00:16:42 UTC (rev 4905)
@@ -48,16 +48,23 @@
                out.close();
        }
     }    
+
     
-    public static void pipe(InputStream in, OutputStream out, byte[] 
workspace) throws IOException {
+       static public void pipe(InputStream in, OutputStream out, byte[] 
workspace) throws IOException {
        try{
                while (true) {
+                       if(Thread.interrupted()){
+                               Thread.currentThread().interrupt();
+                               in.close();
+                               break;
+                       }
+                       
                        int bytesRead = in.read(workspace);
                        if (bytesRead == -1) break;
                        out.write(workspace, 0, bytesRead);
                }
        }finally{
-               out.close();
+                       out.close();
        }
     }    
 }

This was sent by the SourceForge.net collaborative development platform, the 
world's largest Open Source development site.


------------------------------------------------------------------------------
_______________________________________________
Vexi-svn mailing list
Vexi-svn@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/vexi-svn

Reply via email to