Author: cziegeler Date: Wed Sep 18 08:07:13 2013 New Revision: 1524326 URL: http://svn.apache.org/r1524326 Log: SLING-3028 : Support for progress tracking of jobs
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1524326&r1=1524325&r2=1524326&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Wed Sep 18 08:07:13 2013 @@ -41,6 +41,7 @@ public class JobHandler { } public boolean start() { + this.job.prepare(); return this.jobManager.start(this); } @@ -77,4 +78,9 @@ public class JobHandler { public String toString() { return "JobHandler(" + this.job.getId() + ")"; } + + public void updateProperty(final String propName) { + this.jobManager.updateProperty(this.job, propName); + + } } \ No newline at end of file Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1524326&r1=1524325&r2=1524326&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Wed Sep 18 08:07:13 2013 @@ -18,6 +18,7 @@ */ package org.apache.sling.event.impl.jobs; +import java.text.MessageFormat; import java.util.Calendar; import java.util.List; import java.util.Map; @@ -45,6 +46,21 @@ public class JobImpl implements Job { /** Internal job property containing optional delay override. */ public static final String PROPERTY_DELAY_OVERRIDE = ":slingevent:delayOverride"; + /** Property for log statements. */ + public static final String PROPERTY_LOG = "slingevent:log"; + + /** Property for ETA. */ + public static final String PROPERTY_ETA = "slingevent:eta"; + + /** Property for Steps. */ + public static final String PROPERTY_STEPS = "slingevent:steps"; + + /** Property for Step. */ + public static final String PROPERTY_STEP = "slingevent:step"; + + /** Property for final message. */ + public static final String PROPERTY_MESSAGE = "slingevent:message"; + private final ValueMap properties; private final String topic; @@ -238,6 +254,45 @@ public class JobImpl implements Job { */ public void prepare() { this.properties.remove(JobImpl.PROPERTY_DELAY_OVERRIDE); + this.properties.remove(JobImpl.PROPERTY_LOG); + this.properties.remove(JobImpl.PROPERTY_ETA); + this.properties.remove(JobImpl.PROPERTY_STEPS); + this.properties.remove(JobImpl.PROPERTY_STEP); + this.properties.remove(JobImpl.PROPERTY_MESSAGE); + } + + public String update(final long eta) { + this.setProperty(JobImpl.PROPERTY_ETA, eta); + return JobImpl.PROPERTY_ETA; + } + + public String startProgress(final long eta) { + this.setProperty(JobImpl.PROPERTY_ETA, eta); + return JobImpl.PROPERTY_ETA; + } + + public String startProgress(final int steps) { + this.setProperty(JobImpl.PROPERTY_STEPS, steps); + return JobImpl.PROPERTY_STEPS; + } + + public String setProgress(final int step) { + this.setProperty(JobImpl.PROPERTY_STEP, step); + return JobImpl.PROPERTY_STEP; + } + + public String log(final String message, Object... args) { + final String logEntry = MessageFormat.format(message, args); + final String[] entries = this.getProperty(JobImpl.PROPERTY_LOG, String[].class); + if ( entries == null ) { + this.setProperty(JobImpl.PROPERTY_LOG, new String[] {logEntry}); + } else { + final String[] newEntries = new String[entries.length + 1]; + System.arraycopy(entries, 0, newEntries, 0, entries.length); + newEntries[entries.length] = logEntry; + this.setProperty(JobImpl.PROPERTY_LOG, newEntries); + } + return JobImpl.PROPERTY_LOG; } @Override Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1524326&r1=1524325&r2=1524326&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Sep 18 08:07:13 2013 @@ -1122,6 +1122,11 @@ public class JobManagerImpl mvm.put(Job.PROPERTY_JOB_QUEUE_NAME, info.getJob().getQueueName()); mvm.put(Job.PROPERTY_JOB_RETRIES, info.getJob().getNumberOfRetries()); mvm.put(Job.PROPERTY_JOB_PRIORITY, info.getJob().getJobPriority().name()); + mvm.remove(JobImpl.PROPERTY_ETA); + mvm.remove(JobImpl.PROPERTY_STEPS); + mvm.remove(JobImpl.PROPERTY_STEP); + mvm.remove(JobImpl.PROPERTY_LOG); + mvm.remove(JobImpl.PROPERTY_MESSAGE); resolver.commit(); return true; @@ -1354,4 +1359,25 @@ public class JobManagerImpl public TopologyCapabilities getTopologyCapabilities() { return this.topologyCapabilities; } + + public void updateProperty(final JobImpl job, final String propName) { + ResourceResolver resolver = null; + try { + resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null); + final Resource jobResource = resolver.getResource(job.getResourcePath()); + if ( jobResource != null ) { + final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class); + mvm.put(propName, job.getProperty(propName)); + resolver.commit(); + } + } catch ( final PersistenceException ignore ) { + this.ignoreException(ignore); + } catch ( final LoginException ignore ) { + this.ignoreException(ignore); + } finally { + if ( resolver != null ) { + resolver.close(); + } + } + } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1524326&r1=1524325&r2=1524326&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Sep 18 08:07:13 2013 @@ -511,6 +511,7 @@ public abstract class AbstractJobQueue */ @Override public void run() { + final Object lock = new Object(); final Thread currentThread = Thread.currentThread(); // update priority and name final String oldName = currentThread.getName(); @@ -531,43 +532,37 @@ public abstract class AbstractJobQueue final AtomicBoolean isAsync = new AtomicBoolean(false); try { - synchronized ( job ) { - job.prepare(); + synchronized ( lock ) { result = consumer.process(job, new JobExecutionContext() { @Override public void update(final long eta) { - // TODO Auto-generated method stub - + handler.updateProperty(job.update(eta)); } @Override public void startProgress(final long eta) { - // TODO Auto-generated method stub - + handler.updateProperty(job.startProgress(eta)); } @Override public void startProgress(final int steps) { - // TODO Auto-generated method stub - + handler.updateProperty(job.startProgress(steps)); } @Override public void setProgress(final int step) { - // TODO Auto-generated method stub - + handler.updateProperty(job.setProgress(step)); } @Override public void log(final String message, Object... args) { - // TODO Auto-generated method stub - + handler.updateProperty(job.log(message, args)); } @Override public void asyncProcessingFinished(final JobStatus status) { - synchronized ( job ) { + synchronized ( lock ) { if ( isAsync.compareAndSet(true, false) ) { finishedJob(job.getId(), status, true); asyncCounter.decrementAndGet(); @@ -606,8 +601,7 @@ public abstract class AbstractJobQueue pool.execute(task); } else { // if we don't have a thread pool, we create the thread directly - // (this should never happen for jobs, but is a safe fallback and - // allows to call this method for other background processing. + // (this should never happen for jobs, but is a safe fallback) new Thread(task).start(); } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1524326&r1=1524325&r2=1524326&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java Wed Sep 18 08:07:13 2013 @@ -72,7 +72,8 @@ public interface JobExecutionContext { /** * Log a message. - * The message might contain place holders for additional arguments. + * The message and the arguments are passed to the {@link java.text.MessageFormat} + * class. * @param message A message * @param args Additional arguments */