Author: arnaudsimon
Date: Thu Nov 29 06:24:38 2007
New Revision: 599452

URL: http://svn.apache.org/viewvc?rev=599452&view=rev
Log:
added flush

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=599452&r1=599451&r2=599452&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 Thu Nov 29 06:24:38 2007
@@ -31,14 +31,26 @@
                 // use default size
                 MAX_NOT_SYNC_DATA_LENGH = 200000000;
             }
+            String flush = "message_size_before_flush";
+            try
+            {
+                MAX_NOT_FLUSH_DATA_LENGH = new 
Long(System.getProperties().getProperty(flush, "2000000"));
+            }
+            catch (NumberFormatException e)
+            {
+                // use default size
+                MAX_NOT_FLUSH_DATA_LENGH = 20000000;
+            }
     }
 
     private static  long MAX_NOT_SYNC_DATA_LENGH;
+     private static  long MAX_NOT_FLUSH_DATA_LENGH;
     private Map<String,MessagePartListener> _messageListeners = new 
HashMap<String,MessagePartListener>();
     private ClosedListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
     private long _currentDataSizeNotSynced;
+    private long _currentDataSizeNotFlushed;
 
 
     public void messageAcknowledge(RangeSet ranges)
@@ -80,6 +92,7 @@
     public void data(ByteBuffer buf)
     {
         _currentDataSizeNotSynced = _currentDataSizeNotSynced + 
buf.remaining();
+        _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + 
buf.remaining();
         super.data(buf);
     }
 
@@ -122,6 +135,11 @@
         if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= 
MAX_NOT_SYNC_DATA_LENGH)
         {
             sync();
+        }
+         if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= 
MAX_NOT_FLUSH_DATA_LENGH)
+        {
+           executionFlush();
+            _currentDataSizeNotFlushed = 0;
         }
     }
 


Reply via email to