Author: jflesch
Date: 2006-12-05 23:24:03 +0000 (Tue, 05 Dec 2006)
New Revision: 11253
Modified:
trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java
trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java
Log:
Locks between transferts should be more efficients (thanks to nextgens)
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2006-12-05 23:04:34 UTC
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientGet.java 2006-12-05 23:24:03 UTC
(rev 11253)
@@ -291,8 +291,8 @@
}
if(this.isLockOwner) {
-
this.queueManager.getQueryManager().getConnection().unlockReading();
-
this.queueManager.getQueryManager().getConnection().unlockWriting();
+ if (duplicatedQueryManager != null)
+
duplicatedQueryManager.getConnection().removeFromWriterQueue();
this.isLockOwner= false;
}
@@ -408,10 +408,8 @@
Logger.info(this, "File received");
- queryManager.getConnection().unlockReading();
- queryManager.getConnection().unlockWriting();
+
duplicatedQueryManager.getConnection().removeFromWriterQueue();
-
this.isLockOwner= false;
this.running = false;
@@ -453,44 +451,10 @@
}
public void run() {
- if(this.dir == null) {
- Logger.warning(this, "UnlockWaiter.run() : Wtf
?");
- }
-
- try {
- Thread.sleep((new
java.util.Random()).nextInt(1500));
- } catch(java.lang.InterruptedException e) {
-
- }
-
- while(true) {
- if(!this.connection.isReadingLocked()
- && (!this.connection.isWritingLocked()))
- break;
-
- try {
- Thread.sleep(500);
- } catch(java.lang.InterruptedException e) {
-
- }
- }
-
- if(!this.connection.lockReading()) {
- /* Ah ben ou? mais non */
- this.run(); /* TODO: It's dirty => To change !
*/
- return;
- }
-
- if(!this.connection.lockWriting()) {
- /* Ah ben ou? mais non */
- this.connection.unlockReading();
- this.run(); /* TODO: It's dirty => To change !
*/
- return;
- }
-
+ connection.addToWriterQueue();
FCPClientGet.this.isLockOwner = true;
- Logger.debug(this, "I take the reading lock !");
+ Logger.debug(this, "I take the lock !");
if(this.dir == null) {
Logger.warning(this, "UnlockWaiter.run() : Wtf
?");
@@ -530,8 +494,8 @@
Logger.info(this, "Duplicating socket ...");
- this.duplicatedQueryManager =
this.queueManager.getQueryManager().duplicate(this.identifier);
- this.duplicatedQueryManager.addObserver(this);
+ duplicatedQueryManager =
this.queueManager.getQueryManager().duplicate(this.identifier);
+ duplicatedQueryManager.addObserver(this);
Logger.info(this, "Waiting for socket ...");
this.status = "Waiting for socket availability ...";
@@ -542,7 +506,7 @@
this.notifyObservers();
- Thread fork = new Thread(new UnlockWaiter(this,
this.duplicatedQueryManager.getConnection(), dir));
+ Thread fork = new Thread(new UnlockWaiter(this,
duplicatedQueryManager.getConnection(), dir));
fork.start();
return true;
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-12-05 23:04:34 UTC
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPClientPut.java 2006-12-05 23:24:03 UTC
(rev 11253)
@@ -12,7 +12,6 @@
/**
* Allow to insert a simple file.
- * TODO: Use streams instead of reading directly the file.
*/
public class FCPClientPut extends Observable implements FCPTransferQuery,
Observer {
public final static int DEFAULT_INSERTION_PRIORITY = 4;
@@ -291,24 +290,8 @@
}
public void run() {
- while(true) {
- if(!this.connection.isWritingLocked())
- break;
+ connection.addToWriterQueue();
- try {
- Thread.sleep(200);
- } catch(java.lang.InterruptedException e) {
-
- }
-
- }
-
- if(!this.connection.lockWriting()) {
- /* Ah ben ou? mais non ... */
- this.run();
- return;
- }
-
FCPClientPut.this.lockOwner = true;
this.clientPut.continueInsert();
@@ -409,7 +392,7 @@
boolean ret = this.sendFile();
Logger.info(this, "File sent (or not :p)");
- connection.unlockWriting();
+ connection.removeFromWriterQueue();
this.lockOwner = false;
this.sending = false;
@@ -656,7 +639,7 @@
if(this.lockOwner) {
this.lockOwner = false;
-
this.queueManager.getQueryManager().getConnection().unlockWriting();
+
this.queueManager.getQueryManager().getConnection().removeFromWriterQueue();
}
this.status = "Protocol error
("+msg.getValue("CodeDescription")+")";
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-12-05 23:04:34 UTC
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPConnection.java 2006-12-05 23:24:03 UTC
(rev 11253)
@@ -42,10 +42,9 @@
private long rawBytesWaiting = 0;
+ private int writersWaiting;
+ private Object monitor;
- private boolean lockWriting = false;
- private boolean lockReading = false;
-
private long lastWrite = 0; /* real writes ; System.currentTimeMillis()
*/
private boolean duplicationAllowed = true;
@@ -65,6 +64,8 @@
Logger.notice(this, "DEBUG_MODE ACTIVATED");
}
+ monitor = new Object();
+
maxUploadSpeed = -1;
this.setNodeAddress(nodeAddress);
@@ -72,8 +73,7 @@
this.setMaxUploadSpeed(maxUploadSpeed);
this.setDuplicationAllowed(duplicationAllowed);
- this.lockWriting = false;
- this.lockReading = false;
+ writersWaiting = 0;
}
@@ -165,8 +165,7 @@
this.bufferedOut.startSender();
this.rawBytesWaiting = 0;
- this.lockWriting = false;
- this.lockReading = false;
+ writersWaiting = 0;
this.lastWrite = 0;
Logger.info(this, "Connected");
@@ -189,58 +188,7 @@
}
- public synchronized boolean lockWriting() {
- if(this.lockWriting) {
- Logger.notice(this, "Writing already locked! You can't
lock it !");
- return false;
- }
- Logger.debug(this, "Lock writing ...");
- this.lockWriting = true;
-
- return true;
- }
-
- public synchronized boolean lockReading() {
- if(this.lockReading) {
- Logger.notice(this, "Reading already locked! You can't
lock it !");
- return false;
- }
-
- Logger.debug(this, "Lock reading");
- this.lockReading = true;
-
- return true;
- }
-
- public synchronized void unlockWriting() {
- if(!this.lockWriting) {
- Logger.notice(this, "Writing already unlocked !");
- return;
- }
-
- Logger.debug(this, "Unlock writting");
- this.lockWriting = false;
- }
-
- public synchronized void unlockReading() {
- if(!this.lockReading) {
- Logger.notice(this, "Reading already unlocked !");
- return;
- }
-
- Logger.debug(this, "Unlock reading");
- this.lockReading = false;
- }
-
- public boolean isWritingLocked() {
- return this.lockWriting;
- }
-
- public boolean isReadingLocked() {
- return this.lockReading;
- }
-
/**
* Doesn't check the lock state ! You have to manage it yourself.
*/
@@ -277,12 +225,41 @@
return true;
}
+ /**
+ * @return true if it must wait on this FCPConnection object
+ */
+ public void addToWriterQueue() {
+ synchronized(monitor) {
+ writersWaiting++;
+ if (writersWaiting > 1) {
+ try {
+ monitor.wait();
+ } catch(java.lang.InterruptedException e) {
+ Logger.warning(this, "Interrupted while
waiting ?!");
+ }
+ }
+
+ return;
+ }
+ }
+
+ public void removeFromWriterQueue() {
+ synchronized(monitor) {
+ writersWaiting--;
+ if (writersWaiting < 0) {
+ Logger.warning(this, "Negative number of
writers ?!");
+ writersWaiting = 0;
+ }
+ monitor.notify();
+ }
+ }
+
public boolean isWriting() {
if( !this.isConnected() )
return false;
- return ( this.isWritingLocked() || ((System.currentTimeMillis()
- this.lastWrite) < 300) );
+ return ( writersWaiting > 0 || ((System.currentTimeMillis() -
this.lastWrite) < 300) );
}
public boolean write(String toWrite) {
@@ -291,33 +268,30 @@
public boolean write(String toWrite, boolean checkLock) {
- if(checkLock && this.isWritingLocked()) {
- Logger.verbose(this, "Writting lock, unable to write.");
+ if (checkLock) {
+ addToWriterQueue();
}
- while(checkLock && this.isWritingLocked()) {
- try {
- Thread.sleep(200);
- } catch(java.lang.InterruptedException e) {
- /* On s'en fout, mais alors d'une force ... */
- }
- }
-
Logger.asIt(this, "Thaw >>> Node :");
Logger.asIt(this, toWrite);
if(this.out != null && this.socket != null &&
this.socket.isConnected()) {
try {
-
this.bufferedOut.write(toWrite.getBytes("UTF-8"));
+ bufferedOut.write(toWrite.getBytes("UTF-8"));
} catch(java.io.UnsupportedEncodingException e) {
Logger.error(this, "UNSUPPORTED ENCODING
EXCEPTION : UTF-8");
- this.bufferedOut.write(toWrite.getBytes());
+ bufferedOut.write(toWrite.getBytes());
}
} else {
Logger.warning(this, "Cannot write if disconnected
!\n");
+ if (checkLock)
+ removeFromWriterQueue();
return false;
}
+
+ if (checkLock)
+ removeFromWriterQueue();
return true;
}
@@ -333,14 +307,14 @@
* @see #read(int, byte[])
*/
public int read(byte[] buf) {
- return this.read(buf.length, buf);
+ return read(buf.length, buf);
}
/**
* @param lng Obsolete.
* @return -1 Disconnection.
*/
- public synchronized int read(int lng, byte[] buf) {
+ public int read(int lng, byte[] buf) {
int rdBytes = 0;
try {
rdBytes = this.reader.read(buf);
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java 2006-12-05 23:04:34 UTC
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQueueLoader.java 2006-12-05 23:24:03 UTC
(rev 11253)
@@ -28,7 +28,7 @@
boolean ret = listPersistent.start(queueManager);
if(ret)
-
queueManager.getQueryManager().getConnection().lockWriting();
+
queueManager.getQueryManager().getConnection().addToWriterQueue();
return ret;
}
@@ -126,7 +126,7 @@
if("EndListPersistentRequests".equals( msg.getMessageName() )) {
Logger.info(this, "End Of ListPersistentRequests.");
-
this.queueManager.getQueryManager().getConnection().unlockWriting();
+
this.queueManager.getQueryManager().getConnection().removeFromWriterQueue();
this.queueManager.setQueueCompleted();
return;
}
Modified: trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java
===================================================================
--- trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java 2006-12-05 23:04:34 UTC
(rev 11252)
+++ trunk/apps/Thaw/src/thaw/fcp/FCPQueueManager.java 2006-12-05 23:24:03 UTC
(rev 11253)
@@ -518,8 +518,8 @@
return;
try {
-
if(this.queryManager.getConnection().isConnected()
- &&
!this.queryManager.getConnection().isWritingLocked()
+ if(queryManager.getConnection().isConnected()
+ && !queryManager.getConnection().isWriting()
&& queueCompleted) {
this.schedule();