Author: jflesch
Date: 2006-12-05 23:24:03 +0000 (Tue, 05 Dec 2006)
New Revision: 11253

Modified:
   trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
   trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
   trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
   trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java
   trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java
Log:
Locks between transferts should be more efficients (thanks to nextgens)

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java      2006-12-05 23:04:34 UTC 
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java      2006-12-05 23:24:03 UTC 
(rev 11253)
@@ -291,8 +291,8 @@
                        }

                        if(this.isLockOwner) {
-                               
this.queueManager.getQueryManager().getConnection().unlockReading();
-                               
this.queueManager.getQueryManager().getConnection().unlockWriting();
+                               if (duplicatedQueryManager != null)
+                                       
duplicatedQueryManager.getConnection().removeFromWriterQueue();
                                this.isLockOwner= false;
                        }

@@ -408,10 +408,8 @@

                        Logger.info(this, "File received");

-                       queryManager.getConnection().unlockReading();
-                       queryManager.getConnection().unlockWriting();
+                       
duplicatedQueryManager.getConnection().removeFromWriterQueue();

-
                        this.isLockOwner= false;

                        this.running = false;
@@ -453,44 +451,10 @@
                }

                public void run() {
-                       if(this.dir == null) {
-                               Logger.warning(this, "UnlockWaiter.run() : Wtf 
?");
-                       }
-
-                       try {
-                               Thread.sleep((new 
java.util.Random()).nextInt(1500));
-                       } catch(java.lang.InterruptedException e) {
-
-                       }
-
-                       while(true) {
-                               if(!this.connection.isReadingLocked()
-                                  && (!this.connection.isWritingLocked()))
-                                       break;
-
-                               try {
-                                       Thread.sleep(500);
-                               } catch(java.lang.InterruptedException e) {
-
-                               }
-                       }
-
-                       if(!this.connection.lockReading()) {
-                               /* Ah ben ou? mais non */
-                               this.run(); /* TODO: It's dirty => To change ! 
*/
-                               return;
-                       }
-
-                       if(!this.connection.lockWriting()) {
-                               /* Ah ben ou? mais non */
-                               this.connection.unlockReading();
-                               this.run(); /* TODO: It's dirty => To change ! 
*/
-                               return;
-                       }
-
+                       connection.addToWriterQueue();
                        FCPClientGet.this.isLockOwner = true;

-                       Logger.debug(this, "I take the reading lock !");
+                       Logger.debug(this, "I take the lock !");

                        if(this.dir == null) {
                                Logger.warning(this, "UnlockWaiter.run() : Wtf 
?");
@@ -530,8 +494,8 @@

                Logger.info(this, "Duplicating socket ...");

-               this.duplicatedQueryManager = 
this.queueManager.getQueryManager().duplicate(this.identifier);
-               this.duplicatedQueryManager.addObserver(this);
+               duplicatedQueryManager = 
this.queueManager.getQueryManager().duplicate(this.identifier);
+               duplicatedQueryManager.addObserver(this);

                Logger.info(this, "Waiting for socket  ...");
                this.status = "Waiting for socket availability ...";
@@ -542,7 +506,7 @@
                this.notifyObservers();


-               Thread fork = new Thread(new UnlockWaiter(this, 
this.duplicatedQueryManager.getConnection(), dir));
+               Thread fork = new Thread(new UnlockWaiter(this, 
duplicatedQueryManager.getConnection(), dir));
                fork.start();

                return true;

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java      2006-12-05 23:04:34 UTC 
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java      2006-12-05 23:24:03 UTC 
(rev 11253)
@@ -12,7 +12,6 @@

 /**
  * Allow to insert a simple file.
- * TODO: Use streams instead of reading directly the file.
  */
 public class FCPClientPut extends Observable implements FCPTransferQuery, 
Observer {
        public final static int DEFAULT_INSERTION_PRIORITY = 4;
@@ -291,24 +290,8 @@
                }

                public void run() {
-                       while(true) {
-                               if(!this.connection.isWritingLocked())
-                                       break;
+                       connection.addToWriterQueue();

-                               try {
-                                       Thread.sleep(200);
-                               } catch(java.lang.InterruptedException e) {
-
-                               }
-
-                       }
-
-                       if(!this.connection.lockWriting()) {
-                               /* Ah ben ou? mais non ... */
-                               this.run();
-                               return;
-                       }
-
                        FCPClientPut.this.lockOwner = true;

                        this.clientPut.continueInsert();
@@ -409,7 +392,7 @@
                boolean ret = this.sendFile();
                Logger.info(this, "File sent (or not :p)");

-               connection.unlockWriting();
+               connection.removeFromWriterQueue();
                this.lockOwner = false;
                this.sending = false;

@@ -656,7 +639,7 @@

                                if(this.lockOwner) {
                                        this.lockOwner = false;
-                                       
this.queueManager.getQueryManager().getConnection().unlockWriting();
+                                       
this.queueManager.getQueryManager().getConnection().removeFromWriterQueue();
                                }

                                this.status = "Protocol error 
("+msg.getValue("CodeDescription")+")";

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java     2006-12-05 23:04:34 UTC 
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java     2006-12-05 23:24:03 UTC 
(rev 11253)
@@ -42,10 +42,9 @@

        private long rawBytesWaiting = 0;

+       private int writersWaiting;
+       private Object monitor;

-       private boolean lockWriting = false;
-       private boolean lockReading = false;
-
        private long lastWrite = 0; /* real writes ; System.currentTimeMillis() 
*/

        private boolean duplicationAllowed = true;
@@ -65,6 +64,8 @@
                        Logger.notice(this, "DEBUG_MODE ACTIVATED");
                }

+               monitor = new Object();
+
                maxUploadSpeed = -1;

                this.setNodeAddress(nodeAddress);
@@ -72,8 +73,7 @@
                this.setMaxUploadSpeed(maxUploadSpeed);
                this.setDuplicationAllowed(duplicationAllowed);

-               this.lockWriting = false;
-               this.lockReading = false;
+               writersWaiting = 0;
        }


@@ -165,8 +165,7 @@
                this.bufferedOut.startSender();

                this.rawBytesWaiting = 0;
-               this.lockWriting = false;
-               this.lockReading = false;
+               writersWaiting = 0;
                this.lastWrite = 0;

                Logger.info(this, "Connected");
@@ -189,58 +188,7 @@
        }


-       public synchronized boolean lockWriting() {
-               if(this.lockWriting) {
-                       Logger.notice(this, "Writing already locked! You can't 
lock it !");
-                       return false;
-               }

-               Logger.debug(this, "Lock writing ...");
-               this.lockWriting = true;
-
-               return true;
-       }
-
-       public synchronized boolean lockReading() {
-               if(this.lockReading) {
-                       Logger.notice(this, "Reading already locked! You can't 
lock it !");
-                       return false;
-               }
-
-               Logger.debug(this, "Lock reading");
-               this.lockReading = true;
-
-               return true;
-       }
-
-       public synchronized void unlockWriting() {
-               if(!this.lockWriting) {
-                       Logger.notice(this, "Writing already unlocked !");
-                       return;
-               }
-
-               Logger.debug(this, "Unlock writting");
-               this.lockWriting = false;
-       }
-
-       public synchronized void unlockReading() {
-               if(!this.lockReading) {
-                       Logger.notice(this, "Reading already unlocked !");
-                       return;
-               }
-
-               Logger.debug(this, "Unlock reading");
-               this.lockReading = false;
-       }
-
-       public boolean isWritingLocked() {
-               return this.lockWriting;
-       }
-
-       public boolean isReadingLocked() {
-               return this.lockReading;
-       }
-
        /**
         * Doesn't check the lock state ! You have to manage it yourself.
         */
@@ -277,12 +225,41 @@
                return true;
        }

+       /**
+        * @return true if it must wait on this FCPConnection object
+        */
+       public void addToWriterQueue() {
+               synchronized(monitor) {
+                       writersWaiting++;

+                       if (writersWaiting > 1) {
+                               try {
+                                       monitor.wait();
+                               } catch(java.lang.InterruptedException e) {
+                                       Logger.warning(this, "Interrupted while 
waiting ?!");
+                               }
+                       }
+
+                       return;
+               }
+       }
+
+       public void removeFromWriterQueue() {
+               synchronized(monitor) {
+                       writersWaiting--;
+                       if (writersWaiting < 0) {
+                               Logger.warning(this, "Negative number of 
writers ?!");
+                               writersWaiting = 0;
+                       }
+                       monitor.notify();
+               }
+       }
+
        public boolean isWriting() {
                if( !this.isConnected() )
                        return false;

-               return ( this.isWritingLocked() || ((System.currentTimeMillis() 
- this.lastWrite) < 300) );
+               return ( writersWaiting > 0 || ((System.currentTimeMillis() - 
this.lastWrite) < 300) );
        }

        public boolean write(String toWrite) {
@@ -291,33 +268,30 @@

        public boolean write(String toWrite, boolean checkLock) {

-               if(checkLock && this.isWritingLocked()) {
-                       Logger.verbose(this, "Writting lock, unable to write.");
+               if (checkLock) {
+                       addToWriterQueue();
                }

-               while(checkLock && this.isWritingLocked()) {
-                       try {
-                               Thread.sleep(200);
-                       } catch(java.lang.InterruptedException e) {
-                               /* On s'en fout, mais alors d'une force ... */
-                       }
-               }
-
                Logger.asIt(this, "Thaw >>> Node :");
                Logger.asIt(this, toWrite);


                if(this.out != null && this.socket != null && 
this.socket.isConnected()) {
                        try {
-                               
this.bufferedOut.write(toWrite.getBytes("UTF-8"));
+                               bufferedOut.write(toWrite.getBytes("UTF-8"));
                        } catch(java.io.UnsupportedEncodingException e) {
                                Logger.error(this, "UNSUPPORTED ENCODING 
EXCEPTION : UTF-8");
-                               this.bufferedOut.write(toWrite.getBytes());
+                               bufferedOut.write(toWrite.getBytes());
                        }
                } else {
                        Logger.warning(this, "Cannot write if disconnected 
!\n");
+                       if (checkLock)
+                               removeFromWriterQueue();
                        return false;
                }
+
+               if (checkLock)
+                       removeFromWriterQueue();
                return true;
        }

@@ -333,14 +307,14 @@
         * @see #read(int, byte[])
         */
        public int read(byte[] buf) {
-               return this.read(buf.length, buf);
+               return read(buf.length, buf);
        }

        /**
         * @param lng Obsolete.
         * @return -1 Disconnection.
         */
-       public synchronized int read(int lng, byte[] buf) {
+       public int read(int lng, byte[] buf) {
                int rdBytes = 0;
                try {
                        rdBytes = this.reader.read(buf);

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java    2006-12-05 23:04:34 UTC 
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java    2006-12-05 23:24:03 UTC 
(rev 11253)
@@ -28,7 +28,7 @@
                boolean ret = listPersistent.start(queueManager);

                if(ret)
-                       
queueManager.getQueryManager().getConnection().lockWriting();
+                       
queueManager.getQueryManager().getConnection().addToWriterQueue();

                return ret;
        }
@@ -126,7 +126,7 @@

                if("EndListPersistentRequests".equals( msg.getMessageName() )) {
                        Logger.info(this, "End Of ListPersistentRequests.");
-                       
this.queueManager.getQueryManager().getConnection().unlockWriting();
+                       
this.queueManager.getQueryManager().getConnection().removeFromWriterQueue();
                        this.queueManager.setQueueCompleted();
                        return;
                }

Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java   2006-12-05 23:04:34 UTC 
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java   2006-12-05 23:24:03 UTC 
(rev 11253)
@@ -518,8 +518,8 @@
                                return;

                        try {
-                               
if(this.queryManager.getConnection().isConnected()
-                                  && 
!this.queryManager.getConnection().isWritingLocked()
+                               if(queryManager.getConnection().isConnected()
+                                  && !queryManager.getConnection().isWriting()
                                   && queueCompleted) {

                                        this.schedule();


Reply via email to