Author: gtully Date: Thu Feb 26 17:32:37 2009 New Revision: 748218 URL: http://svn.apache.org/viewvc?rev=748218&view=rev Log: resolve mem leak in kahadb - AMQ-2143
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java (with props) Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java?rev=748218&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java Thu Feb 26 17:32:37 2009 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import java.io.File; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; + +/** + * @version $Revision$ + */ +public class KahaDBQueueTest extends SimpleQueueTest { + + protected void configureBroker(BrokerService answer,String uri) throws Exception { + + File dataFileDir = new File("target/test-amq-data/perfTest/kahadb"); + + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + + // The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified + // what happens if the index is updated but a journal update is lost. + // Index is going to be in consistent, but can it be repaired? + kaha.setEnableJournalDiskSyncs(false); + // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. + kaha.setJournalMaxFileLength(1024*100); + + // small batch means more frequent and smaller writes + kaha.setIndexWriteBatchSize(100); + // do the index write in a separate thread + kaha.setEnableIndexWriteAsync(true); + + answer.setPersistenceAdapter(kaha); + answer.addConnector(uri); + answer.setDeleteAllMessagesOnStartup(true); + + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=748218&r1=748217&r2=748218&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu Feb 26 17:32:37 2009 @@ -175,6 +175,9 @@ // assigned // by the data manager (which is basically just appending) + if (!sync) { + inflightWrites.put(new WriteKey(location), write); + } synchronized (this) { batch = enqueue(write); } @@ -185,9 +188,8 @@ } catch (InterruptedException e) { throw new InterruptedIOException(); } - } else { - inflightWrites.put(new WriteKey(location), write); } + return location; }