Revert "https://issues.apache.org/jira/browse/AMQ-3758"
This reverts commit fc244f48e48596c668a7d9dc3b84c26e60693823. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3424e04f Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3424e04f Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3424e04f Branch: refs/heads/activemq-5.10.x Commit: 3424e04fad09ef69eaa72715669ebba40bb45019 Parents: 76357bd Author: Hadrian Zbarcea <hadr...@apache.org> Authored: Wed Dec 24 17:59:47 2014 -0500 Committer: Hadrian Zbarcea <hadr...@apache.org> Committed: Wed Dec 24 17:59:47 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 25 +- .../activemq/broker/jmx/JobSchedulerView.java | 56 +- .../broker/jmx/JobSchedulerViewMBean.java | 113 +- .../apache/activemq/broker/scheduler/Job.java | 23 +- .../activemq/broker/scheduler/JobListener.java | 16 +- .../activemq/broker/scheduler/JobScheduler.java | 33 +- .../broker/scheduler/JobSchedulerFacade.java | 6 - .../broker/scheduler/JobSchedulerStore.java | 43 - .../activemq/broker/scheduler/JobSupport.java | 5 +- .../activemq/store/PersistenceAdapter.java | 119 +- .../store/memory/MemoryPersistenceAdapter.java | 36 +- .../java/org/apache/activemq/util/IOHelper.java | 68 +- .../store/jdbc/JDBCPersistenceAdapter.java | 7 - .../journal/JournalPersistenceAdapter.java | 71 +- .../store/kahadb/AbstractKahaDBMetaData.java | 57 - .../store/kahadb/AbstractKahaDBStore.java | 745 ------------ .../activemq/store/kahadb/KahaDBMetaData.java | 135 --- .../store/kahadb/KahaDBPersistenceAdapter.java | 15 +- .../activemq/store/kahadb/KahaDBStore.java | 55 +- .../kahadb/MultiKahaDBPersistenceAdapter.java | 56 +- .../kahadb/MultiKahaDBTransactionStore.java | 18 +- .../activemq/store/kahadb/TempKahaDBStore.java | 138 +-- .../apache/activemq/store/kahadb/Visitor.java | 20 - .../store/kahadb/scheduler/JobImpl.java | 21 +- .../store/kahadb/scheduler/JobLocation.java | 77 +- .../scheduler/JobLocationsMarshaller.java | 53 - .../kahadb/scheduler/JobSchedulerImpl.java | 837 ++++++-------- .../scheduler/JobSchedulerKahaDBMetaData.java | 246 ---- .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++------------- .../scheduler/UnknownStoreVersionException.java | 24 - .../kahadb/scheduler/legacy/LegacyJobImpl.java | 72 -- .../scheduler/legacy/LegacyJobLocation.java | 296 ----- .../legacy/LegacyJobSchedulerImpl.java | 222 ---- .../legacy/LegacyJobSchedulerStoreImpl.java | 378 ------ .../scheduler/legacy/LegacyStoreReplayer.java | 155 --- .../src/main/proto/journal-data.proto | 61 - .../apache/activemq/leveldb/LevelDBStore.scala | 5 - .../leveldb/replicated/ProxyLevelDBStore.scala | 5 - .../JobSchedulerBrokerShutdownTest.java | 1 - .../JobSchedulerJmxManagementTests.java | 155 --- .../scheduler/JobSchedulerManagementTest.java | 84 +- .../JobSchedulerStoreCheckpointTest.java | 125 -- .../broker/scheduler/JobSchedulerStoreTest.java | 46 +- .../broker/scheduler/JobSchedulerTest.java | 36 - .../scheduler/JobSchedulerTestSupport.java | 112 -- .../KahaDBSchedulerIndexRebuildTest.java | 179 --- .../KahaDBSchedulerMissingJournalLogsTest.java | 204 ---- .../scheduler/SchedulerDBVersionTest.java | 164 --- .../src/test/resources/log4j.properties | 1 - .../activemq/store/schedulerDB/legacy/db-1.log | Bin 524288 -> 0 bytes .../store/schedulerDB/legacy/scheduleDB.data | Bin 20480 -> 0 bytes .../store/schedulerDB/legacy/scheduleDB.redo | Bin 16408 -> 0 bytes 52 files changed, 911 insertions(+), 5584 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 5becec2..00d4abd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -1861,23 +1861,6 @@ public class BrokerService implements Service { try { PersistenceAdapter pa = getPersistenceAdapter(); - if (pa != null) { - this.jobSchedulerStore = pa.createJobSchedulerStore(); - jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); - configureService(jobSchedulerStore); - jobSchedulerStore.start(); - return this.jobSchedulerStore; - } - } catch (IOException e) { - throw new RuntimeException(e); - } catch (UnsupportedOperationException ex) { - // It's ok if the store doesn't implement a scheduler. - } catch (Exception e) { - throw new RuntimeException(e); - } - - try { - PersistenceAdapter pa = getPersistenceAdapter(); if (pa != null && pa instanceof JobSchedulerStore) { this.jobSchedulerStore = (JobSchedulerStore) pa; configureService(jobSchedulerStore); @@ -1887,13 +1870,9 @@ public class BrokerService implements Service { throw new RuntimeException(e); } - // Load the KahaDB store as a last resort, this only works if KahaDB is - // included at runtime, otherwise this will fail. User should disable - // scheduler support if this fails. try { - String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"; - PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance(); - jobSchedulerStore = adaptor.createJobSchedulerStore(); + String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl"; + jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance(); jobSchedulerStore.setDirectory(getSchedulerDirectoryFile()); configureService(jobSchedulerStore); jobSchedulerStore.start(); http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java index 2118a96..9e5a1fb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java @@ -16,39 +16,23 @@ */ package org.apache.activemq.broker.jmx; -import java.util.List; - -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobScheduler; import org.apache.activemq.broker.scheduler.JobSupport; -/** - * MBean object that can be used to manage a single instance of a JobScheduler. The object - * provides methods for querying for jobs and removing some or all of the jobs that are - * scheduled in the managed store. - */ +import javax.management.openmbean.*; +import java.io.IOException; +import java.util.List; + public class JobSchedulerView implements JobSchedulerViewMBean { private final JobScheduler jobScheduler; - /** - * Creates a new instance of the JobScheduler management MBean. - * - * @param jobScheduler - * The scheduler instance to manage. - */ public JobSchedulerView(JobScheduler jobScheduler) { this.jobScheduler = jobScheduler; } - @Override public TabularData getAllJobs() throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -61,7 +45,6 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } - @Override public TabularData getAllJobs(String startTime, String finishTime) throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -76,7 +59,6 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } - @Override public TabularData getNextScheduleJobs() throws Exception { OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class); CompositeType ct = factory.getCompositeType(); @@ -89,51 +71,31 @@ public class JobSchedulerView implements JobSchedulerViewMBean { return rc; } - @Override public String getNextScheduleTime() throws Exception { long time = this.jobScheduler.getNextScheduleTime(); return JobSupport.getDateTime(time); } - @Override public void removeAllJobs() throws Exception { this.jobScheduler.removeAllJobs(); + } - @Override public void removeAllJobs(String startTime, String finishTime) throws Exception { long start = JobSupport.getDataTime(startTime); long finish = JobSupport.getDataTime(finishTime); this.jobScheduler.removeAllJobs(start, finish); - } - @Override - public void removeAllJobsAtScheduledTime(String time) throws Exception { - long removeAtTime = JobSupport.getDataTime(time); - this.jobScheduler.remove(removeAtTime); } - @Override - public void removeJobAtScheduledTime(String time) throws Exception { - removeAllJobsAtScheduledTime(time); - } - - @Override public void removeJob(String jobId) throws Exception { this.jobScheduler.remove(jobId); - } - @Override - public int getExecutionCount(String jobId) throws Exception { - int result = 0; + } - List<Job> jobs = this.jobScheduler.getAllJobs(); - for (Job job : jobs) { - if (job.getJobId().equals(jobId)) { - result = job.getExecutionCount(); - } - } + public void removeJobAtScheduledTime(String time) throws IOException { + // TODO Auto-generated method stub - return result; } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java index 76a7926..f5745ea 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java @@ -18,125 +18,76 @@ package org.apache.activemq.broker.jmx; import javax.management.openmbean.TabularData; -public interface JobSchedulerViewMBean { - /** - * Remove all jobs scheduled to run at this time. If there are no jobs scheduled - * at the given time this methods returns without making any modifications to the - * scheduler store. - * - * @param time - * the string formated time that should be used to remove jobs. - * - * @throws Exception if an error occurs while performing the remove. - * - * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what - * the method is actually doing. - */ - @Deprecated - @MBeanInfo("remove jobs with matching execution time") - public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; +public interface JobSchedulerViewMBean { /** - * Remove all jobs scheduled to run at this time. If there are no jobs scheduled - * at the given time this methods returns without making any modifications to the - * scheduler store. - * + * remove all jobs scheduled to run at this time * @param time - * the string formated time that should be used to remove jobs. - * - * @throws Exception if an error occurs while performing the remove. + * @throws Exception */ @MBeanInfo("remove jobs with matching execution time") - public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; + public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception; /** - * Remove a job with the matching jobId. If the method does not find a matching job - * then it returns without throwing an error or making any modifications to the job - * scheduler store. - * + * remove a job with the matching jobId * @param jobId - * the Job Id to remove from the scheduler store. - * - * @throws Exception if an error occurs while attempting to remove the Job. + * @throws Exception */ @MBeanInfo("remove jobs with matching jobId") public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception; - + /** - * Remove all the Jobs from the scheduler, - * - * @throws Exception if an error occurs while purging the store. + * remove all the Jobs from the scheduler + * @throws Exception */ @MBeanInfo("remove all scheduled jobs") public abstract void removeAllJobs() throws Exception; - + /** - * Remove all the Jobs from the scheduler that are due between the start and finish times. - * - * @param start - * the starting time to remove jobs from. - * @param finish - * the finish time for the remove operation. - * - * @throws Exception if an error occurs while attempting to remove the jobs. + * remove all the Jobs from the scheduler that are due between the start and finish times + * @param start time + * @param finish time + * @throws Exception */ @MBeanInfo("remove all scheduled jobs between time ranges ") public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception; + + /** - * Get the next time jobs will be fired from this scheduler store. - * - * @return the time in milliseconds of the next job to execute. - * - * @throws Exception if an error occurs while accessing the store. + * Get the next time jobs will be fired + * @return the time in milliseconds + * @throws Exception */ @MBeanInfo("get the next time a job is due to be scheduled ") public abstract String getNextScheduleTime() throws Exception; - - /** - * Gets the number of times a scheduled Job has been executed. - * - * @return the total number of time a scheduled job has executed. - * - * @throws Exception if an error occurs while querying for the Job. - */ - @MBeanInfo("get the next time a job is due to be scheduled ") - public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception; - + /** - * Get all the jobs scheduled to run next. - * + * Get all the jobs scheduled to run next * @return a list of jobs that will be scheduled next - * - * @throws Exception if an error occurs while reading the scheduler store. + * @throws Exception */ @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ") public abstract TabularData getNextScheduleJobs() throws Exception; - + /** - * Get all the outstanding Jobs that are scheduled in this scheduler store. - * - * @return a table of all jobs in this scheduler store. - * - * @throws Exception if an error occurs while reading the store. + * Get all the outstanding Jobs + * @return a table of all jobs + * @throws Exception + */ @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ") public abstract TabularData getAllJobs() throws Exception; - + /** - * Get all outstanding jobs due to run between start and finish time range. - * + * Get all outstanding jobs due to run between start and finish * @param start - * the starting time range to query the store for jobs. * @param finish - * the ending time of this query for scheduled jobs. - * - * @return a table of jobs in the range given. - * - * @throws Exception if an error occurs while querying the scheduler store. + * @return a table of jobs in the range + * @throws Exception + */ @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ") public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception; - } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java index 047fe23..7b28a5b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java @@ -16,12 +16,7 @@ */ package org.apache.activemq.broker.scheduler; -/** - * Interface for a scheduled Job object. - * - * Each Job is identified by a unique Job Id which can be used to reference the Job - * in the Job Scheduler store for updates or removal. - */ + public interface Job { /** @@ -43,12 +38,11 @@ public interface Job { * @return the Delay */ public abstract long getDelay(); - /** * @return the period */ public abstract long getPeriod(); - + /** * @return the cron entry */ @@ -58,24 +52,17 @@ public interface Job { * @return the payload */ public abstract byte[] getPayload(); - + /** * Get the start time as a Date time string * @return the date time */ public String getStartTime(); - + /** - * Get the time the job is next due to execute + * Get the time the job is next due to execute * @return the date time */ public String getNextExecutionTime(); - /** - * Gets the total number of times this job has executed. - * - * @returns the number of times this job has been executed. - */ - public int getExecutionCount(); - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java index a453595..c53d9c6 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java @@ -18,21 +18,13 @@ package org.apache.activemq.broker.scheduler; import org.apache.activemq.util.ByteSequence; -/** - * Job event listener interface. Provides event points for Job related events - * such as job ready events. - */ public interface JobListener { - + /** - * A Job that has been scheduled is now ready to be fired. The Job is passed - * in its raw byte form and must be un-marshaled before being delivered. - * - * @param jobId - * The unique Job Id of the Job that is ready to fire. + * A Job that has been scheduled is now ready + * @param id * @param job - * The job that is now ready, delivered in byte form. */ - public void scheduledJob(String id, ByteSequence job); + public void scheduledJob(String id,ByteSequence job); } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java index e951861..2e96eae 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java @@ -46,25 +46,20 @@ public interface JobScheduler { void stopDispatching() throws Exception; /** - * Add a Job listener which will receive events related to scheduled jobs. - * - * @param listener - * The job listener to add. + * Add a Job listener * + * @param l * @throws Exception */ - void addListener(JobListener listener) throws Exception; + void addListener(JobListener l) throws Exception; /** - * remove a JobListener that was previously registered. If the given listener is not in - * the registry this method has no effect. - * - * @param listener - * The listener that should be removed from the listener registry. + * remove a JobListener * + * @param l * @throws Exception */ - void removeListener(JobListener listener) throws Exception; + void removeListener(JobListener l) throws Exception; /** * Add a job to be scheduled @@ -75,8 +70,7 @@ public interface JobScheduler { * the message to be sent when the job is scheduled * @param delay * the time in milliseconds before the job will be run - * - * @throws Exception if an error occurs while scheduling the Job. + * @throws Exception */ void schedule(String jobId, ByteSequence payload, long delay) throws Exception; @@ -88,9 +82,8 @@ public interface JobScheduler { * @param payload * the message to be sent when the job is scheduled * @param cronEntry - * The cron entry to use to schedule this job. - * - * @throws Exception if an error occurs while scheduling the Job. + * - cron entry + * @throws Exception */ void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception; @@ -102,7 +95,7 @@ public interface JobScheduler { * @param payload * the message to be sent when the job is scheduled * @param cronEntry - * cron entry + * - cron entry * @param delay * time in ms to wait before scheduling * @param period @@ -117,8 +110,6 @@ public interface JobScheduler { * remove all jobs scheduled to run at this time * * @param time - * The UTC time to use to remove a batch of scheduled Jobs. - * * @throws Exception */ void remove(long time) throws Exception; @@ -127,9 +118,7 @@ public interface JobScheduler { * remove a job with the matching jobId * * @param jobId - * The unique Job Id to search for and remove from the scheduled set of jobs. - * - * @throws Exception if an error occurs while removing the Job. + * @throws Exception */ void remove(String jobId) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java index 24a216a..d46d04a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java @@ -21,12 +21,6 @@ import java.util.List; import org.apache.activemq.util.ByteSequence; -/** - * A wrapper for instances of the JobScheduler interface that ensures that methods - * provides safe and sane return values and can deal with null values being passed - * in etc. Provides a measure of safety when using unknown implementations of the - * JobSchedulerStore which might not always do the right thing. - */ public class JobSchedulerFacade implements JobScheduler { private final SchedulerBroker broker; http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java index c6863c7..3cbc367 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java @@ -26,56 +26,13 @@ import org.apache.activemq.Service; */ public interface JobSchedulerStore extends Service { - /** - * Gets the location where the Job Scheduler will write the persistent data used - * to preserve and recover scheduled Jobs. - * - * If the scheduler implementation does not utilize a file system based store this - * method returns null. - * - * @return the directory where persistent store data is written. - */ File getDirectory(); - /** - * Sets the directory where persistent store data will be written. This method - * must be called before the scheduler store is started to have any effect. - * - * @param directory - * The directory where the job scheduler store is to be located. - */ void setDirectory(File directory); - /** - * The size of the current store on disk if the store utilizes a disk based store - * mechanism. - * - * @return the current store size on disk. - */ long size(); - /** - * Returns the JobScheduler instance identified by the given name. - * - * @param name - * the name of the JobScheduler instance to lookup. - * - * @return the named JobScheduler or null if none exists with the given name. - * - * @throws Exception if an error occurs while loading the named scheduler. - */ JobScheduler getJobScheduler(String name) throws Exception; - /** - * Removes the named JobScheduler if it exists, purging all scheduled messages - * assigned to it. - * - * @param name - * the name of the scheduler instance to remove. - * - * @return true if there was a scheduler with the given name to remove. - * - * @throws Exception if an error occurs while removing the scheduler. - */ boolean removeJobScheduler(String name) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java index fc5b8dd..6b78d77 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java @@ -20,11 +20,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; -/** - * A class to provide common Job Scheduler related methods. - */ public class JobSupport { - public static String getDateTime(long value) { DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(value); @@ -36,4 +32,5 @@ public class JobSupport { Date date = dfm.parse(value); return date.getTime(); } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index 01a9634..31efd32 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -22,7 +22,6 @@ import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -32,99 +31,74 @@ import org.apache.activemq.usage.SystemUsage; /** * Adapter to the actual persistence mechanism used with ActiveMQ * - * + * */ public interface PersistenceAdapter extends Service { /** - * Returns a set of all the - * {@link org.apache.activemq.command.ActiveMQDestination} objects that the - * persistence store is aware exist. + * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination} + * objects that the persistence store is aware exist. * * @return active destinations */ Set<ActiveMQDestination> getDestinations(); /** - * Factory method to create a new queue message store with the given - * destination name - * + * Factory method to create a new queue message store with the given destination name * @param destination * @return the message store - * @throws IOException + * @throws IOException */ MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException; /** - * Factory method to create a new topic message store with the given - * destination name - * - * @param destination + * Factory method to create a new topic message store with the given destination name + * @param destination * @return the topic message store - * @throws IOException + * @throws IOException */ TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException; /** - * Creates and returns a new Job Scheduler store instance. - * - * @return a new JobSchedulerStore instance if this Persistence adapter provides its own. - * - * @throws IOException If an error occurs while creating the new JobSchedulerStore. - * @throws UnsupportedOperationException If this adapter does not provide its own - * scheduler store implementation. - */ - JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException; - - /** * Cleanup method to remove any state associated with the given destination. * This method does not stop the message store (it might not be cached). - * - * @param destination - * Destination to forget + * @param destination Destination to forget */ void removeQueueMessageStore(ActiveMQQueue destination); /** * Cleanup method to remove any state associated with the given destination * This method does not stop the message store (it might not be cached). - * - * @param destination - * Destination to forget + * @param destination Destination to forget */ void removeTopicMessageStore(ActiveMQTopic destination); /** - * Factory method to create a new persistent prepared transaction store for - * XA recovery - * + * Factory method to create a new persistent prepared transaction store for XA recovery * @return transaction store - * @throws IOException + * @throws IOException */ TransactionStore createTransactionStore() throws IOException; /** - * This method starts a transaction on the persistent storage - which is - * nothing to do with JMS or XA transactions - its purely a mechanism to - * perform multiple writes to a persistent store in 1 transaction as a - * performance optimization. + * This method starts a transaction on the persistent storage - which is nothing to + * do with JMS or XA transactions - its purely a mechanism to perform multiple writes + * to a persistent store in 1 transaction as a performance optimization. * <p/> - * Typically one transaction will require one disk synchronization point and - * so for real high performance its usually faster to perform many writes - * within the same transaction to minimize latency caused by disk - * synchronization. This is especially true when using tools like Berkeley - * Db or embedded JDBC servers. - * - * @param context - * @throws IOException + * Typically one transaction will require one disk synchronization point and so for + * real high performance its usually faster to perform many writes within the same + * transaction to minimize latency caused by disk synchronization. This is especially + * true when using tools like Berkeley Db or embedded JDBC servers. + * @param context + * @throws IOException */ void beginTransaction(ConnectionContext context) throws IOException; + /** * Commit a persistence transaction - * - * @param context - * @throws IOException + * @param context + * @throws IOException * * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ @@ -132,45 +106,40 @@ public interface PersistenceAdapter extends Service { /** * Rollback a persistence transaction - * - * @param context - * @throws IOException + * @param context + * @throws IOException * * @see PersistenceAdapter#beginTransaction(ConnectionContext context) */ void rollbackTransaction(ConnectionContext context) throws IOException; - + /** - * + * * @return last broker sequence * @throws IOException */ long getLastMessageBrokerSequenceId() throws IOException; - + /** * Delete's all the messages in the persistent store. - * + * * @throws IOException */ void deleteAllMessages() throws IOException; - + /** - * @param usageManager - * The UsageManager that is controlling the broker's memory - * usage. + * @param usageManager The UsageManager that is controlling the broker's memory usage. */ void setUsageManager(SystemUsage usageManager); - + /** * Set the name of the broker using the adapter - * * @param brokerName */ void setBrokerName(String brokerName); - + /** * Set the directory where any data files should be created - * * @param dir */ void setDirectory(File dir); @@ -179,30 +148,26 @@ public interface PersistenceAdapter extends Service { * @return the directory used by the persistence adaptor */ File getDirectory(); - + /** * checkpoint any - * - * @param sync - * @throws IOException + * @param sync + * @throws IOException * */ void checkpoint(boolean sync) throws IOException; - + /** * A hint to return the size of the store on disk - * * @return disk space used in bytes of 0 if not implemented */ long size(); /** - * return the last stored producer sequenceId for this producer Id used to - * suppress duplicate sends on failover reconnect at the transport when a - * reconnect occurs - * - * @param id - * the producerId to find a sequenceId for + * return the last stored producer sequenceId for this producer Id + * used to suppress duplicate sends on failover reconnect at the transport + * when a reconnect occurs + * @param id the producerId to find a sequenceId for * @return the last stored sequence id or -1 if no suppression needed */ long getLastProducerSequenceId(ProducerId id) throws IOException; http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 73ea104..0fd6bfc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean - * + * */ public class MemoryPersistenceAdapter implements PersistenceAdapter { private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class); @@ -50,7 +49,6 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>(); private boolean useExternalMessageReferences; - @Override public Set<ActiveMQDestination> getDestinations() { Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size()); for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) { @@ -66,7 +64,6 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return new MemoryPersistenceAdapter(); } - @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { MessageStore rc = queues.get(destination); if (rc == null) { @@ -79,7 +76,6 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return rc; } - @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { TopicMessageStore rc = topics.get(destination); if (rc == null) { @@ -97,7 +93,6 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * * @param destination Destination to forget */ - @Override public void removeQueueMessageStore(ActiveMQQueue destination) { queues.remove(destination); } @@ -107,12 +102,10 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * * @param destination Destination to forget */ - @Override public void removeTopicMessageStore(ActiveMQTopic destination) { topics.remove(destination); } - @Override public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { transactionStore = new MemoryTransactionStore(this); @@ -120,32 +113,25 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return transactionStore; } - @Override public void beginTransaction(ConnectionContext context) { } - @Override public void commitTransaction(ConnectionContext context) { } - @Override public void rollbackTransaction(ConnectionContext context) { } - @Override public void start() throws Exception { } - @Override public void stop() throws Exception { } - @Override public long getLastMessageBrokerSequenceId() throws IOException { return 0; } - @Override public void deleteAllMessages() throws IOException { for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) { MemoryMessageStore store = asMemoryMessageStore(iter.next()); @@ -191,52 +177,38 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { * @param usageManager The UsageManager that is controlling the broker's * memory usage. */ - @Override public void setUsageManager(SystemUsage usageManager) { } - @Override public String toString() { return "MemoryPersistenceAdapter"; } - @Override public void setBrokerName(String brokerName) { } - @Override public void setDirectory(File dir) { } - - @Override + public File getDirectory(){ return null; } - @Override public void checkpoint(boolean sync) throws IOException { } - - @Override + public long size(){ return 0; } - + public void setCreateTransactionStore(boolean create) throws IOException { if (create) { createTransactionStore(); } } - @Override public long getLastProducerSequenceId(ProducerId id) { // memory map does duplicate suppression return -1; } - - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - // We could eventuall implement an in memory scheduler. - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java index 2a70194..a623de9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java @@ -61,9 +61,8 @@ public final class IOHelper { } /** - * Converts any string into a string that is safe to use as a file name. The - * result will only include ascii characters and numbers, and the "-","_", - * and "." characters. + * Converts any string into a string that is safe to use as a file name. + * The result will only include ascii characters and numbers, and the "-","_", and "." characters. * * @param name * @return @@ -77,16 +76,15 @@ public final class IOHelper { } /** - * Converts any string into a string that is safe to use as a file name. The - * result will only include ascii characters and numbers, and the "-","_", - * and "." characters. + * Converts any string into a string that is safe to use as a file name. + * The result will only include ascii characters and numbers, and the "-","_", and "." characters. * * @param name * @param dirSeparators * @param maxFileLength * @return */ - public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) { + public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) { int size = name.length(); StringBuffer rc = new StringBuffer(size * 2); for (int i = 0; i < size; i++) { @@ -94,7 +92,8 @@ public final class IOHelper { boolean valid = c >= 'a' && c <= 'z'; valid = valid || (c >= 'A' && c <= 'Z'); valid = valid || (c >= '0' && c <= '9'); - valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\'))); + valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#') + ||(dirSeparators && ( (c == '/') || (c == '\\'))); if (valid) { rc.append(c); @@ -106,7 +105,7 @@ public final class IOHelper { } String result = rc.toString(); if (result.length() > maxFileLength) { - result = result.substring(result.length() - maxFileLength, result.length()); + result = result.substring(result.length()-maxFileLength,result.length()); } return result; } @@ -169,7 +168,8 @@ public final class IOHelper { } else { for (int i = 0; i < files.length; i++) { File file = files[i]; - if (file.getName().equals(".") || file.getName().equals("..")) { + if (file.getName().equals(".") + || file.getName().equals("..")) { continue; } if (file.isDirectory()) { @@ -190,27 +190,6 @@ public final class IOHelper { } } - public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException { - if (!srcDirectory.isDirectory()) { - throw new IOException("source is not a directory"); - } - - if (targetDirectory.exists() && !targetDirectory.isDirectory()) { - throw new IOException("target exists and is not a directory"); - } else { - mkdirs(targetDirectory); - } - - List<File> filesToMove = new ArrayList<File>(); - getFiles(srcDirectory, filesToMove, filter); - - for (File file : filesToMove) { - if (!file.isDirectory()) { - moveFile(file, targetDirectory); - } - } - } - public static void copyFile(File src, File dest) throws IOException { copyFile(src, dest, null); } @@ -243,32 +222,32 @@ public final class IOHelper { File parent = src.getParentFile(); String fromPath = from.getAbsolutePath(); if (parent.getAbsolutePath().equals(fromPath)) { - // one level down + //one level down result = to; - } else { + }else { String parentPath = parent.getAbsolutePath(); String path = parentPath.substring(fromPath.length()); - result = new File(to.getAbsolutePath() + File.separator + path); + result = new File(to.getAbsolutePath()+File.separator+path); } return result; } - static List<File> getFiles(File dir, FilenameFilter filter) { + static List<File> getFiles(File dir,FilenameFilter filter){ List<File> result = new ArrayList<File>(); - getFiles(dir, result, filter); + getFiles(dir,result,filter); return result; } - static void getFiles(File dir, List<File> list, FilenameFilter filter) { + static void getFiles(File dir,List<File> list,FilenameFilter filter) { if (!list.contains(dir)) { list.add(dir); - String[] fileNames = dir.list(filter); - for (int i = 0; i < fileNames.length; i++) { - File f = new File(dir, fileNames[i]); + String[] fileNames=dir.list(filter); + for (int i =0; i < fileNames.length;i++) { + File f = new File(dir,fileNames[i]); if (f.isFile()) { list.add(f); - } else { - getFiles(dir, list, filter); + }else { + getFiles(dir,list,filter); } } } @@ -307,13 +286,12 @@ public final class IOHelper { public static void mkdirs(File dir) throws IOException { if (dir.exists()) { if (!dir.isDirectory()) { - throw new IOException("Failed to create directory '" + dir + - "', regular file already existed with that name"); + throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name"); } } else { if (!dir.mkdirs()) { - throw new IOException("Failed to create directory '" + dir + "'"); + throw new IOException("Failed to create directory '" + dir+"'"); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index a3a8250..7ff4ae0 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -34,7 +34,6 @@ import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.Locker; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -423,7 +422,6 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements this.lockDataSource = dataSource; } - @Override public BrokerService getBrokerService() { return brokerService; } @@ -848,9 +846,4 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements } return result; } - - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index cc5282f..565fc9f 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activeio.journal.InvalidRecordLocationException; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.JournalEventListener; @@ -41,7 +40,6 @@ import org.apache.activeio.packet.Packet; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.scheduler.JobSchedulerStore; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -80,14 +78,14 @@ import org.slf4j.LoggerFactory; * An implementation of {@link PersistenceAdapter} designed for use with a * {@link Journal} and then check pointing asynchronously on a timeout with some * other long term persistent storage. - * + * * @org.apache.xbean.XBean - * + * */ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware { private BrokerService brokerService; - + protected Scheduler scheduler; private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class); @@ -120,9 +118,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private TaskRunnerFactory taskRunnerFactory; private File directory; - public JournalPersistenceAdapter() { + public JournalPersistenceAdapter() { } - + public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { setJournal(journal); setTaskRunnerFactory(taskRunnerFactory); @@ -137,14 +135,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve this.journal = journal; journal.setJournalEventListener(this); } - + public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) { this.longTermPersistence = longTermPersistence; } - + final Runnable createPeriodicCheckpointTask() { return new Runnable() { - @Override public void run() { long lastTime = 0; synchronized (this) { @@ -161,13 +158,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * @param usageManager The UsageManager that is controlling the * destination's memory usage. */ - @Override public void setUsageManager(SystemUsage usageManager) { this.usageManager = usageManager; longTermPersistence.setUsageManager(usageManager); } - @Override public Set<ActiveMQDestination> getDestinations() { Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations()); destinations.addAll(queues.keySet()); @@ -183,7 +178,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } - @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { JournalMessageStore store = queues.get(destination); if (store == null) { @@ -194,7 +188,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return store; } - @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException { JournalTopicMessageStore store = topics.get(destinationName); if (store == null) { @@ -210,7 +203,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * * @param destination Destination to forget */ - @Override public void removeQueueMessageStore(ActiveMQQueue destination) { queues.remove(destination); } @@ -220,37 +212,30 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve * * @param destination Destination to forget */ - @Override public void removeTopicMessageStore(ActiveMQTopic destination) { topics.remove(destination); } - @Override public TransactionStore createTransactionStore() throws IOException { return transactionStore; } - @Override public long getLastMessageBrokerSequenceId() throws IOException { return longTermPersistence.getLastMessageBrokerSequenceId(); } - @Override public void beginTransaction(ConnectionContext context) throws IOException { longTermPersistence.beginTransaction(context); } - @Override public void commitTransaction(ConnectionContext context) throws IOException { longTermPersistence.commitTransaction(context); } - @Override public void rollbackTransaction(ConnectionContext context) throws IOException { longTermPersistence.rollbackTransaction(context); } - @Override public synchronized void start() throws Exception { if (!started.compareAndSet(false, true)) { return; @@ -261,14 +246,12 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { - @Override public boolean iterate() { return doCheckpoint(); } }, "ActiveMQ Journal Checkpoint Worker"); checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { - @Override public Thread newThread(Runnable runable) { Thread t = new Thread(runable, "Journal checkpoint worker"); t.setPriority(7); @@ -296,7 +279,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } - @Override public void stop() throws Exception { this.usageManager.getMemoryUsage().removeUsageListener(this); @@ -348,17 +330,16 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve /** * The Journal give us a call back so that we can move old data out of the * journal. Taking a checkpoint does this for us. - * + * * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation) */ - @Override public void overflowNotification(RecordLocation safeLocation) { checkpoint(false, true); } /** * When we checkpoint we move all the journalled data to long term storage. - * + * */ public void checkpoint(boolean sync, boolean fullCheckpoint) { try { @@ -388,14 +369,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } - @Override public void checkpoint(boolean sync) { checkpoint(sync, sync); } /** * This does the actual checkpoint. - * + * * @return */ public boolean doCheckpoint() { @@ -418,7 +398,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve // We do many partial checkpoints (fullCheckpoint==false) to move // topic messages // to long term store as soon as possible. - // + // // We want to avoid doing that for queue messages since removes the // come in the same // checkpoint cycle will nullify the previous message add. @@ -431,7 +411,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve try { final JournalMessageStore ms = iterator.next(); FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { - @Override public RecordLocation call() throws Exception { return ms.checkpoint(); } @@ -449,7 +428,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve try { final JournalTopicMessageStore ms = iterator.next(); FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() { - @Override public RecordLocation call() throws Exception { return ms.checkpoint(); } @@ -527,7 +505,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve /** * Move all the messages that were in the journal into long term storage. We * just replay and do a checkpoint. - * + * * @throws IOException * @throws IOException * @throws InvalidRecordLocationException @@ -666,11 +644,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException { if (started.get()) { try { - return journal.write(toPacket(wireFormat.marshal(command)), sync); + return journal.write(toPacket(wireFormat.marshal(command)), sync); } catch (IOException ioe) { - LOG.error("Cannot write to the journal", ioe); - brokerService.handleIOException(ioe); - throw ioe; + LOG.error("Cannot write to the journal", ioe); + brokerService.handleIOException(ioe); + throw ioe; } } throw new IOException("closed"); @@ -682,7 +660,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return writeCommand(trace, sync); } - @Override public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) { newPercentUsage = (newPercentUsage / 10) * 10; oldPercentUsage = (oldPercentUsage / 10) * 10; @@ -696,7 +673,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return transactionStore; } - @Override public void deleteAllMessages() throws IOException { try { JournalTrace trace = new JournalTrace(); @@ -759,7 +735,6 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength()); } - @Override public void setBrokerName(String brokerName) { longTermPersistence.setBrokerName(brokerName); } @@ -769,22 +744,18 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return "JournalPersistenceAdapter(" + longTermPersistence + ")"; } - @Override public void setDirectory(File dir) { this.directory=dir; } - - @Override + public File getDirectory(){ return directory; } - - @Override + public long size(){ return 0; } - @Override public void setBrokerService(BrokerService brokerService) { this.brokerService = brokerService; PersistenceAdapter pa = getLongTermPersistence(); @@ -793,14 +764,8 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } } - @Override public long getLastProducerSequenceId(ProducerId id) { return -1; } - @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { - return longTermPersistence.createJobSchedulerStore(); - } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java deleted file mode 100644 index edb2750..0000000 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadb; - -import org.apache.activemq.store.kahadb.disk.journal.Location; -import org.apache.activemq.store.kahadb.disk.page.Page; - -public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> { - - private int state; - private Location lastUpdateLocation; - private Page<T> page; - - @Override - public Page<T> getPage() { - return page; - } - - @Override - public int getState() { - return state; - } - - @Override - public Location getLastUpdateLocation() { - return lastUpdateLocation; - } - - @Override - public void setPage(Page<T> page) { - this.page = page; - } - - @Override - public void setState(int value) { - this.state = value; - } - - @Override - public void setLastUpdateLocation(Location location) { - this.lastUpdateLocation = location; - } -}