Why?
On Sun, Jul 16, 2006 at 03:49:12PM +0000, jflesch at freenetproject.org wrote:
> Author: jflesch
> Date: 2006-07-16 15:49:09 +0000 (Sun, 16 Jul 2006)
> New Revision: 9630
>
> Added:
> trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> Modified:
> trunk/apps/Thaw/src/thaw/core/Config.java
> trunk/apps/Thaw/src/thaw/core/Core.java
> trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
> trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
> trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
> trunk/apps/Thaw/src/thaw/i18n/thaw.properties
> trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
> trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> Log:
> Traffic shapper (for socket output only) is now implemented + Various bug
> fixes
>
> Modified: trunk/apps/Thaw/src/thaw/core/Config.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-15 21:40:40 UTC (rev
> 9629)
> +++ trunk/apps/Thaw/src/thaw/core/Config.java 2006-07-16 15:49:09 UTC (rev
> 9630)
> @@ -297,6 +297,7 @@
> setValue("nodePort", "9481");
> setValue("maxSimultaneousDownloads", "-1");
> setValue("maxSimultaneousInsertions", "-1");
> + setValue("maxUploadSpeed", "-1");
> setValue("thawId", "thaw_"+(new Integer((new
> Random()).nextInt(1000))).toString());
> }
>
>
> Modified: trunk/apps/Thaw/src/thaw/core/Core.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/Core.java 2006-07-15 21:40:40 UTC (rev
> 9629)
> +++ trunk/apps/Thaw/src/thaw/core/Core.java 2006-07-16 15:49:09 UTC (rev
> 9630)
> @@ -137,7 +137,8 @@
> }
>
> connection = new
> FCPConnection(config.getValue("nodeAddress"),
> - (new
> Integer(config.getValue("nodePort"))).intValue());
> + (new
> Integer(config.getValue("nodePort"))).intValue(),
> + (new
> Integer(config.getValue("maxUploadSpeed"))).intValue());
>
> if(!connection.connect()) {
> new WarningWindow(this, "Unable to connect to
> "+config.getValue("nodeAddress")+":"+
>
> Modified: trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java 2006-07-15
> 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/core/NodeConfigPanel.java 2006-07-16
> 15:49:09 UTC (rev 9630)
> @@ -29,6 +29,7 @@
> I18n.getMessage("thaw.config.nodePort"),
> I18n.getMessage("thaw.config.maxSimultaneousDownloads"),
> I18n.getMessage("thaw.config.maxSimultaneousInsertions"),
> + I18n.getMessage("thaw.config.maxUploadSpeed"),
> I18n.getMessage("thaw.config.thawId")
> };
>
> @@ -37,6 +38,7 @@
> "nodePort",
> "maxSimultaneousDownloads",
> "maxSimultaneousInsertions",
> + "maxUploadSpeed",
> "thawId"
> };
>
>
> Added: trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-15
> 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPBufferedStream.java 2006-07-16
> 15:49:09 UTC (rev 9630)
> @@ -0,0 +1,166 @@
> +package thaw.fcp;
> +
> +import java.net.Socket;
> +import java.io.InputStream;
> +import java.io.OutputStream;
> +import java.io.BufferedInputStream;
> +import java.io.InputStreamReader;
> +
> +import thaw.core.Logger;
> +
> +/**
> + * Only used by FCPConnection. Except special situation, you shouldn't have
> to use it directly.
> + * Currently only used for output. (shouldn't be really usefull for input).
> + * Some data are sent each 'INTERVAL' (in ms).
> + */
> +public class FCPBufferedStream implements Runnable {
> + private FCPConnection connection;
> + private int maxUploadSpeed;
> +
> + private byte outputBuffer[];
> +
> + public final static int OUTPUT_BUFFER_SIZE = 102400;
> + public final static int INTERVAL = 200;
> +
> + private int waiting = 0; /* amount of data stored in the buffer */
> + private int readCursor = 0; /* indicates where the nex read will be */
> + private int writeCursor = 0; /* indicates where the next write will be
> */
> +
> + private Thread tractopelle = null;
> + private boolean running = true;
> + private int packetSize = 0;
> +
> +
> +
> + public FCPBufferedStream(FCPConnection connection,
> + int maxUploadSpeed) {
> + this.connection = connection;
> + this.maxUploadSpeed = maxUploadSpeed;
> +
> + if(maxUploadSpeed >= 0) {
> + outputBuffer = new byte[OUTPUT_BUFFER_SIZE];
> + packetSize = (maxUploadSpeed * 1024) / (1000/INTERVAL);
> + }
> + }
> +
> + /**
> + * Add to the buffer. Can block if buffer is full !
> + * Never send more than OUTPUT_BUFFER_SIZE.
> + */
> + public synchronized boolean write(byte[] data) {
> + if(maxUploadSpeed == -1) {
> + return connection.realRawWrite(data);
> + }
> +
> + while(waiting + data.length > OUTPUT_BUFFER_SIZE) {
> + sleep(INTERVAL);
> + }
> +
> + waiting += data.length;
> +
> + for(int i = 0 ; i < data.length ; i++) {
> + outputBuffer[writeCursor] = data[i];
> +
> + writeCursor++;
> +
> + if(writeCursor >= OUTPUT_BUFFER_SIZE)
> + writeCursor = 0;
> + }
> +
> + return true;
> + }
> +
> + /**
> + * @see write(byte[])
> + */
> + public boolean write(String data) {
> + return write(data.getBytes());
> + }
> +
> + /**
> + * extract from the buffer
> + */
> + private boolean readOutputBuffer(byte[] data) {
> + for(int i = 0; i < data.length ; i++) {
> + data[i] = outputBuffer[readCursor];
> +
> + readCursor++;
> +
> + if(readCursor >= OUTPUT_BUFFER_SIZE)
> + readCursor = 0;
> + }
> +
> + waiting -= data.length;
> +
> + return true;
> + }
> +
> + /**
> + * wait for the buffer being empty.
> + */
> + public void flush() {
> + while(waiting > 0) {
> + sleep(INTERVAL);
> + }
> + }
> +
> +
> + public void run() {
> + byte[] data;
> +
> + while(running) { /* Wild and freeeeeee */
> + if(waiting > 0) {
> + int to_read = packetSize;
> +
> + if(waiting < to_read)
> + to_read = waiting;
> +
> + data = new byte[to_read];
> +
> + readOutputBuffer(data);
> +
> + connection.realRawWrite(data);
> + }
> +
> + sleep(INTERVAL);
> + }
> + }
> +
> + /**
> + * Start the thread sending data from the buffer to the OutputStream
> (socket).
> + */
> + public boolean startSender() {
> + running = true;
> +
> + if(maxUploadSpeed < 0) {
> + Logger.notice(this, "startSender(): No upload limit.
> Not needed");
> + return false;
> + }
> +
> + if(tractopelle == null) {
> + tractopelle = new Thread(this);
> + tractopelle.start();
> + return true;
> + } else {
> + Logger.notice(this, "startSender(): Already started");
> + return false;
> + }
> + }
> +
> + public boolean stopSender() {
> + running = false;
> + tractopelle = null;
> + return true;
> + }
> +
> + /**
> + * Just ignore the InterruptedException.
> + */
> + private void sleep(int ms) {
> + try {
> + Thread.sleep(ms);
> + } catch(java.lang.InterruptedException e) {
> + /* just iggnnnnnooored */
> + }
> + }
> +}
>
> Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-07-15 21:40:40 UTC
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-07-16 15:49:09 UTC
> (rev 9630)
> @@ -162,9 +162,9 @@
> queueManager.getQueryManager().addObserver(this);
>
> progress = 0;
> - running = true;
> finished = false;
> successful = false;
> + running = false;
>
> if(keyType == 2 && privateKey == null) {
> generateSSK();
> @@ -240,6 +240,8 @@
> }
>
> public boolean continueInsert() {
> + running = true; /* Here we are really running */
> +
> FCPConnection connection =
> queueManager.getQueryManager().getConnection();
>
> connection.lockWriting();
> @@ -431,10 +433,11 @@
> publicKey = msg.getValue("URI");
> publicKey = publicKey.replaceAll("freenet:",
> "");
>
> - /*
> +
> if(keyType == 0)
> publicKey = publicKey + "/" +name;
>
> + /*
> if(keyType > 0)
> publicKey = publicKey + "/" + name +
> "-" + (new Integer(rev)).toString();
> */
> @@ -454,7 +457,7 @@
> publicKey = publicKey.replaceAll("freenet:",
> "");
>
> if(keyType == 0)
> - publicKey = publicKey + name;
> + publicKey = publicKey + "/" + name;
> if(keyType == 1)
> publicKey = "KSK@"+name+"-" + (new
> Integer(rev)).toString();
> if(keyType == 2)
> @@ -603,20 +606,26 @@
> setChanged();
> notifyObservers();
>
> - FCPMessage msg = new FCPMessage();
> - msg.setMessageName("RemovePersistentRequest");
> - msg.setValue("Identifier", identifier);
> -
> - if(global)
> - msg.setValue("Global", "true");
> - else
> - msg.setValue("Global", "false");
> + if(isRunning() || isFinished()) {
> + FCPMessage msg = new FCPMessage();
> + msg.setMessageName("RemovePersistentRequest");
> + msg.setValue("Identifier", identifier);
> +
> + if(global)
> + msg.setValue("Global", "true");
> + else
> + msg.setValue("Global", "false");
> +
> + queueManager.getQueryManager().writeMessage(msg);
> +
> + running = false;
> +
> + queueManager.getQueryManager().deleteObserver(this);
> + } else {
> + Logger.notice(this, "Nothing to remove");
> + }
>
> - queueManager.getQueryManager().writeMessage(msg);
>
> - running = false;
> -
> - queueManager.getQueryManager().deleteObserver(this);
> return true;
> }
>
>
> Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-15 21:40:40 UTC
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-07-16 15:49:09 UTC
> (rev 9630)
> @@ -17,9 +17,13 @@
> * After being instanciated, you should commit it to the FCPQueryManager,
> and then
> * commit the FCPQueryManager to the FCPQueueManager.
> * Call observer when connected / disconnected.
> + * WARNING: This FCP implement don't guarantee that messages are sent in the
> same order than initally put
> + * if the lock on writting is not set !
> * TODO: Add functions socketToFile(long size, File file) /
> fileToSocket(File file)
> */
> public class FCPConnection extends Observable {
> + private FCPBufferedStream bufferedOut = null;
> + private int maxUploadSpeed = 0;
>
> private String nodeAddress = null;
> private int port = 0;
> @@ -41,9 +45,11 @@
>
> /**
> * Don't connect. Call connect() for that.
> + * @param maxUploadSpeed in KB: -1 means no limit
> */
> public FCPConnection(String nodeAddress,
> - int port)
> + int port,
> + int maxUploadSpeed)
> {
> if(DEBUG_MODE) {
> Logger.notice(this, "DEBUG_MODE ACTIVATED");
> @@ -51,22 +57,21 @@
>
> setNodeAddress(nodeAddress);
> setNodePort(port);
> + setMaxUploadSpeed(maxUploadSpeed);
> }
>
>
> - /**
> - * You will probably have to use resetQueue() from the FCPQueueManager
> after using this function.
> - */
> public void setNodeAddress(String nodeAddress) {
> this.nodeAddress = nodeAddress;
> }
>
> - /**
> - * You will probably have to use resetQueue() from the FCPQueueManager
> after using this function.
> - */
> public void setNodePort(int port) {
> this.port = port;
> }
> +
> + public void setMaxUploadSpeed(int max) {
> + this.maxUploadSpeed = max;
> + }
>
>
> public void disconnect() {
> @@ -83,6 +88,8 @@
> socket = null;
> in = null;
> out = null;
> + bufferedOut.stopSender();
> + bufferedOut = null;
>
> setChanged();
> notifyObservers();
> @@ -133,6 +140,8 @@
> }
>
> reader = new BufferedInputStream(in);
> + bufferedOut = new FCPBufferedStream(this, maxUploadSpeed);
> + bufferedOut.startSender();
>
> Logger.info(this, "Connected");
>
> @@ -167,6 +176,13 @@
> * Doesn't check the lock state ! You have to manage it yourself.
> */
> public boolean rawWrite(byte[] data) {
> + return bufferedOut.write(data);
> + }
> +
> + /**
> + * Should be call by FCPBufferedStream. Not you.
> + */
> + public boolean realRawWrite(byte[] data) {
> if(out != null && socket != null && socket.isConnected()) {
> try {
> out.write(data);
> @@ -182,11 +198,11 @@
> return true;
> }
>
> - public synchronized boolean write(String toWrite) {
> + public boolean write(String toWrite) {
> return write(toWrite, true);
> }
>
> - public synchronized boolean write(String toWrite, boolean checkLock) {
> + public boolean write(String toWrite, boolean checkLock) {
>
> if(checkLock && lockWriting) {
> Logger.verbose(this, "Writting lock, unable to write.");
> @@ -205,13 +221,7 @@
>
>
> if(out != null && socket != null && socket.isConnected()) {
> - try {
> - out.write(toWrite.getBytes());
> - } catch(java.io.IOException e) {
> - Logger.warning(this, "Unable to write() on the
> socket ?! : "+ e.toString());
> - disconnect();
> - return false;
> - }
> + bufferedOut.write(toWrite.getBytes());
> } else {
> Logger.warning(this, "Cannot write if disconnected
> !\n");
> return false;
> @@ -336,20 +346,4 @@
> return null;
> }
>
> -
> - /**
> - * Use this when you want to fetch the data still waiting on the socket.
> - */
> - public InputStream getInputStream() {
> - return in;
> - }
> -
> -
> - /**
> - * Use this when you want to send raw data.
> - */
> - public OutputStream getOutputStream() {
> - return out;
> - }
> -
> }
>
> Modified: trunk/apps/Thaw/src/thaw/i18n/thaw.properties
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-15 21:40:40 UTC
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/i18n/thaw.properties 2006-07-16 15:49:09 UTC
> (rev 9630)
> @@ -83,6 +83,9 @@
>
> thaw.config.pluginsLoaded=Plugins loaded:
>
> +thaw.config.maxUploadSpeed=Maximum upload speed in KB/s (-1 = unlimited)
> +
> +
> ## Plugins
> thaw.plugin.insert.fileToInsert=File to insert
> thaw.plugin.insert.filesToInsert=File(s) to insert
>
> Modified: trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties 2006-07-15 21:40:40 UTC
> (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/i18n/thaw_fr.properties 2006-07-16 15:49:09 UTC
> (rev 9630)
> @@ -15,7 +15,7 @@
> thaw.common.insertions=Insertions
> thaw.common.downloads=Downloads
>
> -thaw.common.fetch=Aller chercher
> +thaw.common.fetch=R?cup?rer
>
> thaw.common.file=Fichier
> thaw.common.progress=Progr?s
> @@ -83,6 +83,8 @@
>
> thaw.config.pluginsLoaded=Plugins charg?s:
>
> +thaw.config.maxUploadSpeed=Vitesse maximum d'envoi en Ko/s (-1 = illimit?)
> +
> ##?Plugins
> thaw.plugin.insert.fileToInsert=Fichier ? ins?rer
> thaw.plugin.insert.filesToInsert=Fichier(s) ? ins?rer
>
> Modified: trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> 2006-07-15 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/plugins/insertPlugin/InsertPanel.java
> 2006-07-16 15:49:09 UTC (rev 9630)
> @@ -220,7 +220,7 @@
>
> mainPanel.setSize(400, 400);
>
> - globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 50,
> 50));
> + globalPanel.setLayout(new FlowLayout(FlowLayout.CENTER, 10,
> 10));
>
> globalPanel.add(mainPanel);
> }
>
> Modified: trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> ===================================================================
> --- trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> 2006-07-15 21:40:40 UTC (rev 9629)
> +++ trunk/apps/Thaw/src/thaw/plugins/queueWatcher/DetailPanel.java
> 2006-07-16 15:49:09 UTC (rev 9630)
> @@ -153,7 +153,7 @@
> else
>
> key.setText(I18n.getMessage("thaw.common.unknown"));
>
> - if(query.getFileSize() > 0)
> + if(query.getFileSize() == 0)
>
> size.setText(I18n.getMessage("thaw.common.unknown"));
> else
> size.setText((new
> Long(query.getFileSize())).toString()+" B");
>
> _______________________________________________
> Thaw mailing list
> Thaw at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/thaw
>
--
Matthew J Toseland - toad at amphibian.dyndns.org
Freenet Project Official Codemonkey - http://freenetproject.org/
ICTHUS - Nothing is impossible. Our Boss says so.
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL:
<https://emu.freenetproject.org/pipermail/thaw/attachments/20060719/a3b6b235/attachment.pgp>