Why?

On Sun, Jul 16, 2006 at 03:49:12PM +0000, jflesch at freenetproject.org wrote:
> Author: jflesch
> Date: 2006-07-16 15:49:09 +0000 (Sun, 16 Jul 2006)
> New Revision: 9630
> 
> Added:
>    trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> Modified:
>    trunk/apps/Thaw/src/thaw/core/Config.java
>    trunk/apps/Thaw/src/thaw/core/Core.java
>    trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
>    trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
>    trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
>    trunk/apps/Thaw/src/thaw/i18n/thaw.properties
>    trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
>    trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
>    trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> Log:
> Traffic shapper (for socket output only) is now implemented + Various bug 
> fixes
> 
> Modified: trunk/apps/Thaw/src/thaw/core/Config.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-15 21:40:40 UTC (rev 
> 9629)
> +++ trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-16 15:49:09 UTC (rev 
> 9630)
> @@ -297,6 +297,7 @@
>               setValue("nodePort", "9481");
>               setValue("maxSimultaneousDownloads", "-1");
>               setValue("maxSimultaneousInsertions", "-1");
> +             setValue("maxUploadSpeed", "-1");
>               setValue("thawId", "thaw_"+(new Integer((new 
> Random()).nextInt(1000))).toString());
>       }
>  
> 
> Modified: trunk/apps/Thaw/src/thaw/core/Core.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/Core.java   2006-07-15 21:40:40 UTC (rev 
> 9629)
> +++ trunk/apps/Thaw/src/thaw/core/Core.java   2006-07-16 15:49:09 UTC (rev 
> 9630)
> @@ -137,7 +137,8 @@
>                       }
>                       
>                       connection = new 
> FCPConnection(config.getValue("nodeAddress"),
> -                                                    (new 
> Integer(config.getValue("nodePort"))).intValue());
> +                                                    (new 
> Integer(config.getValue("nodePort"))).intValue(),
> +                                                    (new 
> Integer(config.getValue("maxUploadSpeed"))).intValue());
>                       
>                       if(!connection.connect()) {
>                               new WarningWindow(this, "Unable to connect to 
> "+config.getValue("nodeAddress")+":"+
> 
> Modified: trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java        2006-07-15 
> 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java        2006-07-16 
> 15:49:09 UTC (rev 9630)
> @@ -29,6 +29,7 @@
>               I18n.getMessage("thaw.config.nodePort"),
>               I18n.getMessage("thaw.config.maxSimultaneousDownloads"),
>               I18n.getMessage("thaw.config.maxSimultaneousInsertions"),
> +             I18n.getMessage("thaw.config.maxUploadSpeed"),
>               I18n.getMessage("thaw.config.thawId")
>       };
>  
> @@ -37,6 +38,7 @@
>               "nodePort",
>               "maxSimultaneousDownloads",
>               "maxSimultaneousInsertions",
> +             "maxUploadSpeed",
>               "thawId"
>       };
>  
> 
> Added: trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java       2006-07-15 
> 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java       2006-07-16 
> 15:49:09 UTC (rev 9630)
> @@ -0,0 +1,166 @@
> +package thaw.fcp;
> +
> +import java.net.Socket;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.io.BufferedInputStream;
> +import java.io.InputStreamReader;
> +
> +import thaw.core.Logger;
> +
> +/**
> + * Only used by FCPConnection. Except special situation, you shouldn't have 
> to use it directly.
> + * Currently only used for output. (shouldn't be really usefull for input).
> + * Some data are sent each 'INTERVAL' (in ms).
> + */
> +public class FCPBufferedStream implements Runnable {
> +     private FCPConnection connection;
> +     private int maxUploadSpeed;
> +     
> +     private byte outputBuffer[];
> +
> +     public final static int OUTPUT_BUFFER_SIZE = 102400;
> +     public final static int INTERVAL = 200;
> +
> +     private int waiting = 0; /* amount of data stored in the buffer */
> +     private int readCursor = 0; /* indicates where the nex read will be */
> +     private int writeCursor = 0; /* indicates where the next write will be 
> */
> +
> +     private Thread tractopelle = null;
> +     private boolean running = true;
> +     private int packetSize = 0;
> +
> +
> +
> +     public FCPBufferedStream(FCPConnection connection,
> +                              int maxUploadSpeed) {
> +             this.connection = connection;
> +             this.maxUploadSpeed = maxUploadSpeed;
> +             
> +             if(maxUploadSpeed >= 0) {
> +                     outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
> +                     packetSize = (maxUploadSpeed * 1024) / (1000/INTERVAL);
> +             }
> +     }
> +
> +     /**
> +      * Add to the buffer. Can block if buffer is full !
> +      * Never send more than OUTPUT_BUFFER_SIZE.
> +      */
> +     public synchronized boolean write(byte[] data) {
> +             if(maxUploadSpeed == -1) {
> +                     return connection.realRawWrite(data);
> +             }
> +
> +             while(waiting + data.length > OUTPUT_BUFFER_SIZE) {
> +                     sleep(INTERVAL);
> +             }
> +
> +             waiting += data.length;
> +
> +             for(int i = 0 ; i < data.length ; i++) {
> +                     outputBuffer[writeCursor] = data[i];
> +
> +                     writeCursor++;
> +
> +                     if(writeCursor >= OUTPUT_BUFFER_SIZE)
> +                             writeCursor = 0;
> +             }
> +
> +             return true;
> +     }
> +
> +     /**
> +      * @see write(byte[])
> +      */
> +     public boolean write(String data) {
> +             return write(data.getBytes());
> +     }
> +
> +     /**
> +      * extract from the buffer
> +      */
> +     private boolean readOutputBuffer(byte[] data) {
> +             for(int i = 0; i < data.length ; i++) {
> +                     data[i] = outputBuffer[readCursor];
> +                     
> +                     readCursor++;
> +
> +                     if(readCursor >= OUTPUT_BUFFER_SIZE)
> +                             readCursor = 0;
> +             }
> +
> +             waiting -= data.length;
> +
> +             return true;
> +     }
> +
> +     /**
> +      * wait for the buffer being empty.
> +      */
> +     public void flush() {
> +             while(waiting > 0) {
> +                     sleep(INTERVAL);
> +             }               
> +     }
> +
> +
> +     public void run() {
> +             byte[] data;
> +             
> +             while(running) { /* Wild and freeeeeee */
> +                     if(waiting > 0) {
> +                             int to_read = packetSize;
> +                             
> +                             if(waiting < to_read)
> +                                     to_read = waiting;
> +                             
> +                             data = new byte[to_read];
> +                             
> +                             readOutputBuffer(data);
> +                             
> +                             connection.realRawWrite(data);
> +                     }
> +
> +                     sleep(INTERVAL);
> +             }               
> +     }
> +
> +     /**
> +      * Start the thread sending data from the buffer to the OutputStream 
> (socket).
> +      */
> +     public boolean startSender() {
> +             running = true;
> +
> +             if(maxUploadSpeed < 0) {
> +                     Logger.notice(this, "startSender(): No upload limit. 
> Not needed");
> +                     return false;
> +             }
> +
> +             if(tractopelle == null) {
> +                     tractopelle = new Thread(this);
> +                     tractopelle.start();
> +                     return true;
> +             } else {
> +                     Logger.notice(this, "startSender(): Already started");
> +                     return false;
> +             }
> +     }
> +
> +     public boolean stopSender() {
> +             running = false;
> +             tractopelle = null;
> +             return true;
> +     }
> +
> +     /**
> +      * Just ignore the InterruptedException.
> +      */
> +     private void sleep(int ms) {
> +             try {
> +                     Thread.sleep(ms);
> +             } catch(java.lang.InterruptedException e) {
> +                     /* just iggnnnnnooored */
> +             }
> +     }
> +}
> 
> Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java    2006-07-15 21:40:40 UTC 
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java    2006-07-16 15:49:09 UTC 
> (rev 9630)
> @@ -162,9 +162,9 @@
>               queueManager.getQueryManager().addObserver(this);
>               
>               progress = 0;
> -             running = true;
>               finished = false;
>               successful = false;
> +             running = false;
>               
>               if(keyType == 2 && privateKey == null) {
>                       generateSSK();
> @@ -240,6 +240,8 @@
>       }
>  
>       public boolean continueInsert() {
> +             running = true; /* Here we are really running */
> +
>               FCPConnection connection = 
> queueManager.getQueryManager().getConnection();
>  
>               connection.lockWriting();
> @@ -431,10 +433,11 @@
>                               publicKey = msg.getValue("URI");
>                               publicKey = publicKey.replaceAll("freenet:", 
> "");
>  
> -                             /*
> +                             
>                               if(keyType == 0)
>                                       publicKey = publicKey + "/" +name;
>  
> +                             /*
>                               if(keyType > 0)
>                                       publicKey = publicKey + "/" + name + 
> "-" + (new Integer(rev)).toString();
>                               */
> @@ -454,7 +457,7 @@
>                               publicKey = publicKey.replaceAll("freenet:", 
> "");
>  
>                               if(keyType == 0)
> -                                     publicKey = publicKey + name;
> +                                     publicKey = publicKey + "/" + name;
>                               if(keyType == 1)
>                                       publicKey = "KSK@"+name+"-" + (new 
> Integer(rev)).toString();
>                               if(keyType == 2)
> @@ -603,20 +606,26 @@
>               setChanged();
>               notifyObservers();
>  
> -             FCPMessage msg = new FCPMessage();
> -             msg.setMessageName("RemovePersistentRequest");
> -             msg.setValue("Identifier", identifier);
> -             
> -             if(global)
> -                     msg.setValue("Global", "true");
> -             else
> -                     msg.setValue("Global", "false");
> +             if(isRunning() || isFinished()) {
> +                     FCPMessage msg = new FCPMessage();
> +                     msg.setMessageName("RemovePersistentRequest");
> +                     msg.setValue("Identifier", identifier);
> +                     
> +                     if(global)
> +                             msg.setValue("Global", "true");
> +                     else
> +                             msg.setValue("Global", "false");
> +                     
> +                     queueManager.getQueryManager().writeMessage(msg);
> +                     
> +                     running = false;
> +                     
> +                     queueManager.getQueryManager().deleteObserver(this);
> +             } else {
> +                     Logger.notice(this, "Nothing to remove");
> +             }
>  
> -             queueManager.getQueryManager().writeMessage(msg);
>  
> -             running = false;
> -
> -             queueManager.getQueryManager().deleteObserver(this);
>               return true;
>       }
>  
> 
> Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java   2006-07-15 21:40:40 UTC 
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java   2006-07-16 15:49:09 UTC 
> (rev 9630)
> @@ -17,9 +17,13 @@
>   * After being instanciated, you should commit it to the FCPQueryManager, 
> and then
>   * commit the FCPQueryManager to the FCPQueueManager.
>   * Call observer when connected / disconnected.
> + * WARNING: This FCP implement don't guarantee that messages are sent in the 
> same order than initally put
> + *          if the lock on writting is not set !
>   * TODO: Add functions socketToFile(long size, File file) / 
> fileToSocket(File file)
>   */
>  public class FCPConnection extends Observable {
> +     private FCPBufferedStream bufferedOut = null;
> +     private int maxUploadSpeed = 0;
>  
>       private String nodeAddress = null;
>       private int port = 0;
> @@ -41,9 +45,11 @@
>  
>       /**
>        * Don't connect. Call connect() for that.
> +      * @param maxUploadSpeed in KB: -1 means no limit
>        */
>       public FCPConnection(String nodeAddress,
> -                          int port)
> +                          int port,
> +                          int maxUploadSpeed)
>       {
>               if(DEBUG_MODE) {
>                       Logger.notice(this, "DEBUG_MODE ACTIVATED");
> @@ -51,22 +57,21 @@
>  
>               setNodeAddress(nodeAddress);
>               setNodePort(port);
> +             setMaxUploadSpeed(maxUploadSpeed);
>       }
>  
>  
> -     /**
> -      * You will probably have to use resetQueue() from the FCPQueueManager 
> after using this function.
> -      */
>       public void setNodeAddress(String nodeAddress) {
>               this.nodeAddress = nodeAddress;
>       }
>  
> -     /**
> -      * You will probably have to use resetQueue() from the FCPQueueManager 
> after using this function.
> -      */
>       public void setNodePort(int port) {
>               this.port = port;
>       }
> +
> +     public void setMaxUploadSpeed(int max) {
> +             this.maxUploadSpeed = max;
> +     }
>       
>       
>       public void disconnect() {
> @@ -83,6 +88,8 @@
>               socket = null;
>               in = null;
>               out = null;
> +             bufferedOut.stopSender();
> +             bufferedOut = null;
>  
>               setChanged();
>               notifyObservers();
> @@ -133,6 +140,8 @@
>               }
>  
>               reader = new BufferedInputStream(in);
> +             bufferedOut = new FCPBufferedStream(this, maxUploadSpeed);
> +             bufferedOut.startSender();
>  
>               Logger.info(this, "Connected");
>  
> @@ -167,6 +176,13 @@
>        * Doesn't check the lock state ! You have to manage it yourself.
>        */
>       public boolean rawWrite(byte[] data) {
> +             return bufferedOut.write(data);
> +     }
> +
> +     /**
> +      * Should be call by FCPBufferedStream. Not you.
> +      */
> +     public boolean realRawWrite(byte[] data) {
>               if(out != null && socket != null && socket.isConnected()) {
>                       try {
>                               out.write(data);
> @@ -182,11 +198,11 @@
>               return true;
>       }
>  
> -     public synchronized boolean write(String toWrite) {
> +     public boolean write(String toWrite) {
>               return write(toWrite, true);
>       }
>  
> -     public synchronized boolean write(String toWrite, boolean checkLock) {
> +     public boolean write(String toWrite, boolean checkLock) {
>  
>               if(checkLock && lockWriting) {
>                       Logger.verbose(this, "Writting lock, unable to write.");
> @@ -205,13 +221,7 @@
>  
>  
>               if(out != null && socket != null && socket.isConnected()) {
> -                     try {
> -                             out.write(toWrite.getBytes());
> -                     } catch(java.io.IOException e) {
> -                             Logger.warning(this, "Unable to write() on the 
> socket ?! : "+ e.toString());
> -                             disconnect();
> -                             return false;
> -                     }
> +                     bufferedOut.write(toWrite.getBytes());
>               } else {
>                       Logger.warning(this, "Cannot write if disconnected 
> !\n");
>                       return false;
> @@ -336,20 +346,4 @@
>               return null;
>       }
>  
> -
> -     /**
> -      * Use this when you want to fetch the data still waiting on the socket.
> -      */
> -     public InputStream getInputStream() {
> -             return in;
> -     }
> -
> -
> -     /**
> -      * Use this when you want to send raw data.
> -      */
> -     public OutputStream getOutputStream() {
> -             return out;
> -     }
> -
>  }
> 
> Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/i18n/thaw.properties     2006-07-15 21:40:40 UTC 
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties     2006-07-16 15:49:09 UTC 
> (rev 9630)
> @@ -83,6 +83,9 @@
>  
>  thaw.config.pluginsLoaded=Plugins loaded:
>  
> +thaw.config.maxUploadSpeed=Maximum upload speed in KB/s (-1 = unlimited)
> +
> +
>  ## Plugins
>  thaw.plugin.insert.fileToInsert=File to insert
>  thaw.plugin.insert.filesToInsert=File(s) to insert
> 
> Modified: trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties  2006-07-15 21:40:40 UTC 
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties  2006-07-16 15:49:09 UTC 
> (rev 9630)
> @@ -15,7 +15,7 @@
>  thaw.common.insertions=Insertions
>  thaw.common.downloads=Downloads
>  
> -thaw.common.fetch=Aller chercher
> +thaw.common.fetch=R?cup?rer
>  
>  thaw.common.file=Fichier
>  thaw.common.progress=Progr?s
> @@ -83,6 +83,8 @@
>  
>  thaw.config.pluginsLoaded=Plugins charg?s:
>  
> +thaw.config.maxUploadSpeed=Vitesse maximum d'envoi en Ko/s (-1 = illimit?)
> +
>  ##?Plugins
>  thaw.plugin.insert.fileToInsert=Fichier ? ins?rer
>  thaw.plugin.insert.filesToInsert=Fichier(s) ? ins?rer
> 
> Modified: trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java    
> 2006-07-15 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java    
> 2006-07-16 15:49:09 UTC (rev 9630)
> @@ -220,7 +220,7 @@
>  
>               mainPanel.setSize(400, 400);
>  
> -             globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 50, 
> 50));
> +             globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 10, 
> 10));
>  
>               globalPanel.add(mainPanel);
>       }
> 
> Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java    
> 2006-07-15 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java    
> 2006-07-16 15:49:09 UTC (rev 9630)
> @@ -153,7 +153,7 @@
>                       else
>                               
> key.setText(I18n.getMessage("thaw.common.unknown"));
>  
> -                     if(query.getFileSize() > 0)
> +                     if(query.getFileSize() == 0)
>                               
> size.setText(I18n.getMessage("thaw.common.unknown"));
>                       else
>                               size.setText((new 
> Long(query.getFileSize())).toString()+" B");
> 
> _______________________________________________
> Thaw mailing list
> Thaw at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/thaw
> 

-- 
Matthew J Toseland - toad at amphibian.dyndns.org
Freenet Project Official Codemonkey - http://freenetproject.org/
ICTHUS - Nothing is impossible. Our Boss says so.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL: 
<https://emu.freenetproject.org/pipermail/thaw/attachments/20060719/a3b6b235/attachment.pgp>

Reply via email to