Author: jcompagner Date: Wed Feb 21 14:57:38 2007 New Revision: 510288 URL: http://svn.apache.org/viewvc?view=rev&rev=510288 Log: 2 threads now (saving and serializing) removing sync blocks as much as possible
Modified:
incubator/wicket/branches/wicket-1.x/wicket/src/main/java/wicket/protocol/http/FilePageStore.java
Modified:
incubator/wicket/branches/wicket-1.x/wicket/src/main/java/wicket/protocol/http/FilePageStore.java
URL:
http://svn.apache.org/viewvc/incubator/wicket/branches/wicket-1.x/wicket/src/main/java/wicket/protocol/http/FilePageStore.java?view=diff&rev=510288&r1=510287&r2=510288
==============================================================================
---
incubator/wicket/branches/wicket-1.x/wicket/src/main/java/wicket/protocol/http/FilePageStore.java
(original)
+++
incubator/wicket/branches/wicket-1.x/wicket/src/main/java/wicket/protocol/http/FilePageStore.java
Wed Feb 21 14:57:38 2007
@@ -56,23 +56,18 @@
private final File defaultWorkDir;
- private final ConcurrentHashMap storePageMap;
+ private final PageSerializingThread serThread;
+ private final ConcurrentHashMap pagesToBeSerialized;
- private final PageSavingThread thread;
+
+ private final PageSavingThread saveThread;
+ private final ConcurrentHashMap pagesToBeSaved;
- private final String appName;
-
- private volatile long totalSavingTime = 0;
- private volatile long totalSerializationTime = 0;
-
- private volatile int saved;
- private volatile int bytesSaved;
+ private final String appName;
private volatile int serialized;
-
- private int pagesInMap;
-
+ private volatile long totalSerializationTime = 0;
/**
* Construct.
@@ -92,13 +87,21 @@
public FilePageStore(File dir)
{
defaultWorkDir = dir;
- storePageMap = new ConcurrentHashMap();
- thread = new PageSavingThread();
+ pagesToBeSerialized = new ConcurrentHashMap();
+ pagesToBeSaved = new ConcurrentHashMap();
appName = Application.get().getApplicationKey();
- Thread t = new Thread(thread, "FilePageStoreThread-" + appName);
+
+ saveThread = new PageSavingThread();
+ Thread t = new Thread(saveThread, "FilePageSavingThread-" +
appName);
t.setDaemon(true);
+ t.setPriority(Thread.MAX_PRIORITY);
t.start();
+ serThread = new PageSerializingThread();
+ t = new Thread(serThread, "FilePageSerializingThread-" +
appName);
+ t.setDaemon(true);
+ t.setPriority(Thread.MIN_PRIORITY);
+ t.start();
}
/**
@@ -195,37 +198,17 @@
*/
public void storePage(String sessionId, Page page)
{
- List list = (List)storePageMap.get(sessionId);
+ List list = (List)pagesToBeSerialized.get(sessionId);
if (list == null)
{
- list = Collections.synchronizedList(new LinkedList());
- }
- // if the pagemap falls behind, directly serialize it here.
- // this check is now really wrong!! We should use a complete
counter..
- if (pagesInMap > 25)
- {
- SessionPageKey key = new SessionPageKey(sessionId,
page);
- byte[] bytes = serializePage(key, page);
- if (bytes != null)
- {
- key.setObject(bytes);
- list.add(key);
- // do really put it back in.. The writer thread
could have removed it.
- storePageMap.put(sessionId, list);
- }
-
+ list = new LinkedList();
}
- else
+ synchronized (list)
{
list.add(new SessionPageKey(sessionId, page));
- // do really put it back in.. The writer thread could
have removed it.
- storePageMap.put(sessionId, list);
- synchronized (storePageMap)
- {
- pagesInMap++;
- storePageMap.notifyAll();
- }
}
+ // do really put it back in.. The writer thread could have
removed it.
+ pagesToBeSerialized.put(sessionId, list);
}
/**
@@ -244,12 +227,18 @@
*/
public void destroy()
{
- thread.stop();
+ saveThread.stop();
+ serThread.stop();
}
private byte[] testMap(SessionPageKey currentKey)
{
- List list = (List)storePageMap.get(currentKey.sessionId);
+ SessionPageKey previousPage =
(SessionPageKey)pagesToBeSaved.get(currentKey);
+ if (previousPage != null)
+ {
+ return (byte[])previousPage.data;
+ }
+ List list = (List)pagesToBeSerialized.get(currentKey.sessionId);
if (list == null) return null;
@@ -264,10 +253,6 @@
{
list.remove(index);
}
- else if (object instanceof byte[])
- {
- return (byte[])object;
- }
else if (object == SERIALIZING)
{
try
@@ -285,6 +270,11 @@
}
else
{
+ previousPage =
(SessionPageKey)pagesToBeSaved.get(currentKey);
+ if (previousPage != null)
+ {
+ return
(byte[])previousPage.data;
+ }
return null;
}
}
@@ -295,15 +285,11 @@
}
}
- synchronized (storePageMap)
- {
- pagesInMap--;
- }
byte[] bytes = serializePage(currentKey, (Page)currentKey.data);
if (bytes != null)
{
currentKey.setObject(bytes);
- list.add(currentKey);
+ pagesToBeSaved.put(currentKey,currentKey);
}
return bytes;
}
@@ -319,7 +305,8 @@
private void removeSessionFromPendingMap(String sessionId)
{
- storePageMap.remove(sessionId);
+ pagesToBeSerialized.remove(sessionId);
+ // TODO remove from pagesToBeSaved..
removeSession(sessionId);
}
@@ -350,7 +337,7 @@
*/
private void removePageFromPendingMap(String sessionId, int id)
{
- List list = (List)storePageMap.get(sessionId);
+ List list = (List)pagesToBeSerialized.get(sessionId);
if (list == null) return;
@@ -366,6 +353,7 @@
}
}
}
+ // TODO remove from pages to be saved
removePage(sessionId, id);
}
@@ -420,62 +408,15 @@
byte[] bytes = Objects.objectToByteArray(page);
totalSerializationTime += (System.currentTimeMillis() - t1);
serialized++;
- return bytes;
- }
-
- /**
- * @param sessionId
- * @param key
- * @param bytes
- */
- private void savePage(SessionPageKey key, byte[] bytes)
- {
- File sessionDir = new File(getWorkDir(), key.sessionId);
- sessionDir.mkdirs();
- File pageFile = getPageFile(key, sessionDir);
-
- FileOutputStream fos = null;
- long t1 = System.currentTimeMillis();
- int length = 0;
- try
- {
- fos = new FileOutputStream(pageFile);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- fos.getChannel().write(bb);
- length = bytes.length;
- }
- catch (Exception e)
- {
- log.error("Error saving page " + key.pageClass + " [" +
key.id + ","
- + key.versionNumber + "] for the
sessionid " + key.sessionId);
- }
- finally
- {
- try
- {
- if (fos != null)
- {
- fos.close();
- }
- }
- catch (IOException ex)
- {
- // ignore
- }
- }
- long t3 = System.currentTimeMillis();
if (log.isDebugEnabled())
{
- log.debug("storing page " + key.pageClass + "[" +
key.id + "," + key.versionNumber
- + "] size: " + length + " for session "
+ key.sessionId + " took " + (t3 - t1)
- + " miliseconds to save");
- }
- totalSavingTime += (t3 - t1);
- saved++;
- bytesSaved += length;
+ log.debug("serializing page " + key.pageClass + "[" +
key.id + "," + key.versionNumber
+ + "] size: " + bytes.length + " for
session " + key.sessionId + " took " + (System.currentTimeMillis() - t1)
+ + " miliseconds to serialize");
+ }
+ return bytes;
}
-
private class SessionPageKey
{
private final String sessionId;
@@ -566,6 +507,9 @@
private class PageSavingThread implements Runnable
{
private volatile boolean stop = false;
+ private long totalSavingTime = 0;
+ private int saved;
+ private int bytesSaved;
/**
* Stops this thread.
@@ -575,13 +519,7 @@
System.err.println("Total time in saving: " +
totalSavingTime);
System.err.println("Bytes saved: " + bytesSaved);
System.err.println("Pages saved: " + saved);
- System.err.println("Total time in serialization: " +
totalSerializationTime);
- System.err.println("Pages serialized: " + serialized);
- synchronized (storePageMap)
- {
- stop = true;
- storePageMap.notifyAll();
- }
+ stop = true;
}
/**
@@ -593,43 +531,158 @@
{
try
{
- synchronized (storePageMap)
+ while (pagesToBeSaved.size() == 0)
{
+ Thread.sleep(2000);
if (stop)
return;
- while (storePageMap.size() == 0)
+ }
+// if ( pagesToBeSaved.size() > 100)
+// {
+// System.err.println("max");
+//
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
+// }
+// else if ( pagesToBeSaved.size() > 25)
+// {
+// System.err.println("normal");
+//
Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
+// }
+// else
+// {
+// System.err.println("min");
+//
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
+// }
+ Iterator it =
pagesToBeSaved.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry entry =
(Entry)it.next();
+ SessionPageKey key =
(SessionPageKey)entry.getKey();
+ if (key.data instanceof byte[])
{
- storePageMap.wait();
+ savePage(key,
(byte[])key.data);
}
+ it.remove();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Error in page save thread",
e);
+ }
+ }
+ }
+
+
+ /**
+ * @param sessionId
+ * @param key
+ * @param bytes
+ */
+ private void savePage(SessionPageKey key, byte[] bytes)
+ {
+ File sessionDir = new File(getWorkDir(), key.sessionId);
+ sessionDir.mkdirs();
+ File pageFile = getPageFile(key, sessionDir);
+
+ FileOutputStream fos = null;
+ long t1 = System.currentTimeMillis();
+ int length = 0;
+ try
+ {
+ fos = new FileOutputStream(pageFile);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ fos.getChannel().write(bb);
+ length = bytes.length;
+ }
+ catch (Exception e)
+ {
+ log.error("Error saving page " + key.pageClass
+ " [" + key.id + ","
+ + key.versionNumber + "] for
the sessionid " + key.sessionId);
+ }
+ finally
+ {
+ try
+ {
+ if (fos != null)
+ {
+ fos.close();
+ }
+ }
+ catch (IOException ex)
+ {
+ // ignore
+ }
+ }
+ long t3 = System.currentTimeMillis();
+ if (log.isDebugEnabled())
+ {
+ log.debug("storing page " + key.pageClass + "["
+ key.id + "," + key.versionNumber
+ + "] size: " + length + " for
session " + key.sessionId + " took " + (t3 - t1)
+ + " miliseconds to save");
+ }
+ totalSavingTime += (t3 - t1);
+ saved++;
+ bytesSaved += length;
+ }
+
+ }
+
+ private class PageSerializingThread implements Runnable
+ {
+ private volatile boolean stop = false;
+
+ private int serializedInThread = 0;
+
+ /**
+ * Stops this thread.
+ */
+ public void stop()
+ {
+ System.err.println("Total time in serialization: " +
totalSerializationTime);
+ System.err.println("Total Pages serialized: " +
serialized);
+ System.err.println("Pages serialized by thread: " +
serializedInThread);
+ stop = true;
+ }
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run()
+ {
+ while (!stop)
+ {
+ try
+ {
+ while (pagesToBeSerialized.size() == 0)
+ {
+ Thread.sleep(2000);
+ if (stop)
+ return;
}
- Iterator it =
storePageMap.entrySet().iterator();
+ Iterator it =
pagesToBeSerialized.entrySet().iterator();
outer : while (it.hasNext())
{
Map.Entry entry =
(Entry)it.next();
List sessionList =
(List)entry.getValue();
while (true)
{
- byte[] pageBytes = null;
- Object data = null;
+ Page page = null;
SessionPageKey key =
null;
synchronized
(sessionList)
{
if
(sessionList.size() != 0)
{
key =
(SessionPageKey)sessionList.get(0);
- data =
key.data;
- if
(data instanceof Page)
+ if
(key.data instanceof Page)
{
+
page = (Page)key.data;
key.setObject(SERIALIZING);
}
- else if
(data instanceof byte[])
- {
-
pageBytes = (byte[])data;
- }
else
{
sessionList.remove(0);
+
System.err.println("shouldn't happen");
+
continue;
}
}
// no key found
in the current list.
@@ -638,33 +691,20 @@
// the
list is removed now!
// but
it could be that a request add something to the list now.
//
thats why a request has to check it again.
-
storePageMap.remove(entry.getKey());
+
pagesToBeSerialized.remove(entry.getKey());
continue outer;
}
}
- if (data instanceof
Page)
- {
- pageBytes =
serializePage(key, (Page)data);
- synchronized
(sessionList)
- {
-
key.setObject(pageBytes);
-
sessionList.notifyAll();
- }
- synchronized
(storePageMap)
- {
-
pagesInMap--;
- }
- }
-
- if (pageBytes != null)
+ byte[] pageBytes =
serializePage(key, page);
+ serializedInThread++;
+ synchronized
(sessionList)
{
- savePage(key,
pageBytes);
+
key.setObject(pageBytes);
+
sessionList.remove(key);
+
sessionList.notifyAll();
}
- sessionList.remove(key);
- key = null;
- pageBytes = null;
- data = null;
+ pagesToBeSaved.put(key,
key);
}
}
}
