Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Created by ieb on 13/04/2016.
+ * This has to be part of the API to prevent all sorts of other issues.
+ */
+public final class Types {
+
+
+
+    private Types() {
+
+    }
+
+    public interface JobQueue {
+        org.apache.sling.mom.Types.QueueName asQueueName();
+
+        org.apache.sling.mom.Types.TopicName asTopicName();
+
+    }
+
+    public interface JobType {
+    }
+
+    public static JobQueue jobQueue(String jobQueue) {
+        return new JobQueueImpl(jobQueue);
+    }
+
+    public static JobType jobType(String jobType) {
+        return new JobTypeImpl(jobType);
+    }
+
+    public static JobQueue ANY_JOB_QUEUE = new AnyJobQueue();
+
+
+    public static Set<JobType> jobType(String[] types) {
+        Set<JobType> hs = new HashSet<JobType>();
+        for ( String s : types) {
+            hs.add(jobType(s));
+        }
+        return Collections.unmodifiableSet(hs);
+    }
+
+
+
+
+    /**
+     * Wraps a JobType.
+     */
+    private static class JobTypeImpl extends StringWrapper implements JobType {
+
+        private JobTypeImpl(String jobType) {
+            super(jobType);
+        }
+    }
+
+
+
+    /**
+     * Wraps a JobQueue.
+     */
+    private static class JobQueueImpl extends StringWrapper implements 
JobQueue {
+
+        private JobQueueImpl(String jobQueue) {
+            super(jobQueue);
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            return obj == ANY_JOB_QUEUE || super.equals(obj);
+        }
+
+
+        @Override
+        public org.apache.sling.mom.Types.QueueName asQueueName() {
+            return org.apache.sling.mom.Types.queueName(toString());
+        }
+
+        @Override
+        public org.apache.sling.mom.Types.TopicName asTopicName() {
+            return org.apache.sling.mom.Types.topicName(toString());
+        }
+    }
+
+    /**
+     * Special JobQueue to match any.
+     */
+    private static class AnyJobQueue extends JobQueueImpl {
+
+
+        private AnyJobQueue() {
+            super("*");
+        }
+
+
+        @Override
+        public boolean equals(Object obj) {
+            return true;
+        }
+
+        @Override
+        public int compareTo(String o) {
+            return 0;
+        }
+    }
+
+    private static class StringWrapper implements Comparable<String> {
+
+
+        private String value;
+
+        private StringWrapper(String value) {
+            this.value = value;
+        }
+
+        @Override
+        public int hashCode() {
+            return value.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return value.equals(obj.toString());
+        }
+
+        @Override
+        public int compareTo(String o) {
+            return value.compareTo(o);
+        }
+
+        public String toString() {
+            return value;
+        }
+    }
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/Types.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobBuilder;
+import org.apache.sling.jobs.Types;
+import org.apache.sling.jobs.impl.spi.JobStarter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by ieb on 29/03/2016.
+ * Provides an implementation of a JobBuilder.
+ */
+public class JobBuilderImpl implements JobBuilder {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobBuilderImpl.class);
+    private final String id;
+    private final Map<String, Object> properties;
+    private final JobStarter jobStarter;
+    private final Types.JobQueue topic;
+    private final Types.JobType jobType;
+
+
+    public JobBuilderImpl(JobStarter jobStarter, Types.JobQueue topic, 
Types.JobType jobType) {
+        this.jobStarter = jobStarter;
+        this.topic = topic;
+        this.jobType = jobType;
+        this.id = Utils.generateId();
+        properties = new HashMap<String, Object>();
+    }
+
+
+    @Nonnull
+    @Override
+    public JobBuilder addProperties(@Nonnull Map<String, Object> props) {
+        this.properties.putAll(props);
+        return this;
+    }
+
+    @Nonnull
+    @Override
+    public Job add() {
+        return jobStarter.start(new JobImpl(topic, id, jobType, properties));
+    }
+
+
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobBuilderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.sling.jobs.*;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * JobImpl is a data object to hold the current state of the job in the 
current JVM as loaded into memory.
+ * The JobImpl also listens for JobUpdates.
+ * Created by ieb on 23/03/2016.
+ */
+public class JobImpl implements Job, JobUpdateListener {
+    private final Types.JobQueue jobQueue;
+    private final String id;
+    private final Map<String, Object> properties = new HashMap<String, 
Object>();
+    private int retryCount;
+    private int numberOfRetries;
+    private long startedAt;
+    private long createdAt;
+    private long lastUpdate = 0;
+    private JobState jobState;
+    private long finishedAt;
+    private String resultMessage;
+    private JobController jobController;
+    private Types.JobType jobType;
+
+    public JobImpl(@Nonnull Types.JobQueue jobQueue, @Nonnull String id, 
@Nonnull Types.JobType jobType, @Nonnull Map<String, Object> properties) {
+        this.jobQueue = jobQueue;
+        this.jobType = jobType;
+        this.id = id;
+        this.resultMessage = "";
+        this.createdAt = System.currentTimeMillis();
+        this.jobState = JobState.CREATED;
+        this.properties.putAll(properties);
+    }
+
+    public JobImpl(JobUpdate update) {
+        this.jobQueue = update.getQueue();
+        this.id = update.getId();
+        update(update);
+        updateProperties(update.getProperties());
+    }
+
+    @Nonnull
+    @Override
+    public Types.JobQueue getQueue() {
+        return jobQueue;
+    }
+
+    @Nonnull
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Nonnull
+    @Override
+    public Types.JobType getJobType() {
+        return jobType;
+    }
+
+    @Nonnull
+    @Override
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return numberOfRetries;
+    }
+
+    @Override
+    public long getStarted() {
+        return startedAt;
+    }
+
+    @Override
+    public long getCreated() {
+        return createdAt;
+    }
+
+    @Nonnull
+    @Override
+    public JobState getJobState() {
+        return jobState;
+    }
+
+    @Override
+    public void setState(@Nonnull JobState newState) {
+        jobState = newState;
+    }
+
+    @Override
+    public long getFinished() {
+        return finishedAt;
+    }
+
+    @Nullable
+    @Override
+    public String getResultMessage() {
+        return resultMessage;
+    }
+
+
+    @Nullable
+    @Override
+    public JobController getController() {
+        return jobController;
+    }
+
+    @Override
+    public void setJobController(@Nonnull JobController jobController) {
+        this.jobController = jobController;
+    }
+
+    @Override
+    public void removeJobController() {
+        jobController = null;
+    }
+
+    /**
+     * Apply an job update to this job, checking that the update is valid for 
the job.
+     * @param jobUpdate
+     */
+    @Override
+    public void update(@Nonnull JobUpdate jobUpdate) {
+        if  ( id.equals(jobUpdate.getId()) && ( jobQueue == 
Types.ANY_JOB_QUEUE || jobQueue.equals(jobUpdate.getQueue()))) {
+            // Start Job commands always go onto a queue and dont expire.
+            if ( jobUpdate.getCommand() != 
JobUpdate.JobUpdateCommand.START_JOB && jobUpdate.expires() < 
System.currentTimeMillis()) {
+                throw new IllegalStateException(
+                        "JobUpdate has expired, can't be applied. Expired at 
"+jobUpdate.expires()+
+                                ", time now "+System.currentTimeMillis()+
+                                " expired 
"+(System.currentTimeMillis()-jobUpdate.expires())+" ms ago.");
+            }
+            if (jobUpdate.updateTimestamp() < lastUpdate ) {
+                throw new IllegalStateException("JobUpdate received out of 
sequence, cant be applied. Last Update was at "+lastUpdate+" this update is at 
"+jobUpdate.updateTimestamp());
+            }
+            lastUpdate = jobUpdate.updateTimestamp();
+            switch(jobUpdate.getCommand()) {
+                case START_JOB:
+                    updateState(jobUpdate);
+                    updateProperties(jobUpdate.getProperties());
+                    break;
+                case UPDATE_JOB:
+                    // note, when job first comes into existence it is 
updated, then started.
+                    // the start message is a queued message, the update is a 
jobQueue or pub sub message.
+                    updateState(jobUpdate);
+                    updateProperties(jobUpdate.getProperties());
+                    break;
+                case RETRY_JOB:
+                    updateState(jobUpdate);
+                    // Allow more retries.
+                    numberOfRetries = retryCount + numberOfRetries;
+                    // TODO: trigger retry if required.
+                    updateProperties(jobUpdate.getProperties());
+                    break;
+                case STOP_JOB:
+                    if (jobController != null) {
+                        jobController.stop();
+                    }
+                    break;
+                case ABORT_JOB:
+                    if (jobController != null) {
+                        jobController.abort();
+                    }
+                    break;
+            }
+        } else {
+            throw new IllegalArgumentException("Cant update job with jobUpdate 
that doesn't match id and jobQueue ");
+        }
+    }
+
+    /**
+     * Update the properties taking into account any PropertyActions required.
+     * @param properties the update properties.
+     */
+    private void updateProperties(@Nonnull Map<String, Object> properties) {
+        Preconditions.checkNotNull(properties, "Properties cant be null.");
+        for (Map.Entry<String, Object> e : properties.entrySet()) {
+            if (e.getValue() instanceof JobUpdate.JobPropertyAction ) {
+                switch(((JobUpdate.JobPropertyAction)e.getValue())) {
+                    case REMOVE:
+                        this.properties.remove(e.getKey());
+                        break;
+                }
+            } else {
+                this.properties.put(e.getKey(), e.getValue());
+            }
+        }
+    }
+
+    /**
+     * Update the jobstate data for the job.
+     * @param jobUpdate
+     */
+    private void updateState(@Nonnull JobUpdate jobUpdate) {
+        retryCount = jobUpdate.getRetryCount();
+        jobType = jobUpdate.getJobType();
+        numberOfRetries = jobUpdate.getNumberOfRetries();
+        startedAt = jobUpdate.getStarted();
+        createdAt = jobUpdate.getCreated();
+        finishedAt = jobUpdate.getFinished();
+        resultMessage = jobUpdate.getResultMessage();
+        jobState = jobUpdate.getState();
+    }
+
+    /**
+     * Get a JobUpdateBuilder for this Job.
+     * @return the job update builder.
+     */
+    @Nonnull
+    @Override
+    public JobUpdateBuilder newJobUpdateBuilder() {
+        return new JobUpdateBuilderImpl(this);
+    }
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import org.apache.sling.jobs.*;
+import org.apache.sling.jobs.impl.spi.JobStarter;
+import org.apache.sling.jobs.impl.spi.JobStorage;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Created by ieb on 29/03/2016.
+ * Implements a JobManager, storing Jobs in a JobStorage implementation.
+ * Implements a JobStarter so it can start Jobs and queue them by sending a 
message.
+ * Implements a JobUpdateListener so it can store inbound update messages.
+ * Requires a JobStorage implementation to store jobs and a JobListener to 
process update messages.
+ * Requires a JobUpdateLister which it uses to send JobUpdates to.
+ * Does not run jobs, that is performed by implementations of JobConsumer 
listening to queues.
+ */
+public  class JobManagerImpl implements JobManager, JobStarter, 
JobUpdateListener {
+
+
+    /**
+     * Storage for jobs.
+     */
+    private final JobStorage jobStorage;
+    /**
+     * A listener for job updates.
+     */
+    private final JobUpdateListener messageSender;
+
+    /**
+     *
+     * @param jobStorage
+     * @param messageSender
+     */
+    public JobManagerImpl(JobStorage jobStorage, JobUpdateListener 
messageSender) {
+        this.jobStorage = jobStorage;
+        this.messageSender = messageSender;
+    }
+
+    @Nonnull
+    @Override
+    public JobBuilder newJobBuilder(@Nonnull Types.JobQueue queue, @Nonnull 
Types.JobType jobType) {
+        return new JobBuilderImpl(this, queue, jobType);
+    }
+
+    @Nullable
+    @Override
+    public Job getJobById(@Nonnull String jobId) {
+        return jobStorage.get(jobId);
+    }
+
+
+    @Nullable
+    @Override
+    public Job getJob(@Nonnull Types.JobQueue queue, @Nonnull Map<String, 
Object> template) {
+        throw new UnsupportedOperationException("TODO, implementation 
required");
+    }
+
+    @Nonnull
+    @Override
+    public Collection<Job> findJobs(@Nonnull QueryType type, @Nonnull 
Types.JobQueue queue, long limit, @Nullable Map<String, Object>... templates) {
+        throw new UnsupportedOperationException("TODO, implementation 
required");
+    }
+
+    @Override
+    public void stopJobById(@Nonnull String jobId) {
+        Job job = getJobById(jobId);
+        if ( job != null) {
+            
messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.STOP_JOB).build());
+        }
+    }
+
+    @Override
+    public boolean abortJob(@Nonnull String jobId) {
+        Job job = getJobById(jobId);
+        if ( job != null) {
+            
messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.ABORT_JOB).build());
+            return true;
+        } else {
+            messageSender.update(new 
JobUpdateBuilderImpl(jobId).command(JobUpdate.JobUpdateCommand.ABORT_JOB).build());
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public Job retryJobById(@Nonnull String jobId) {
+        Job job = getJobById(jobId);
+        if (job != null) {
+            
messageSender.update(job.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.RETRY_JOB).build());
+        }
+        return job;
+    }
+
+
+    @Override
+    public Job start(Job j) {
+        j.setState(Job.JobState.QUEUED);
+        
messageSender.update(j.newJobUpdateBuilder().command(JobUpdate.JobUpdateCommand.START_JOB).putAll(j.getProperties()).build());
+        return jobStorage.put(j);
+    }
+
+
+    @Override
+    public void update(@Nonnull JobUpdate update) {
+        Job j = jobStorage.get(update.getId());
+        if ( j instanceof JobUpdateListener ) {
+            ((JobUpdateListener) j).update(update);
+        } else {
+            jobStorage.put(new JobImpl(update));
+        }
+    }
+
+    public void dispose() {
+    }
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.*;
+import org.apache.sling.mom.*;
+import org.apache.sling.mom.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * Created by ieb on 12/04/2016.
+ * This is a configuration factory that creates QueueReader instances on 
configuration. These connect to the JobManager
+ * service and are registered using the OSGi Whiteboard pattern with the 
QueueManager. The JobManager service must implement JobConsumer.
+ *
+ */
+@Component(configurationFactory = true,
+        policy = ConfigurationPolicy.REQUIRE,
+        metatype = true,
+        immediate = true)
+@Properties({
+    @Property(name= QueueReader.QUEUE_NAME_PROP)
+})
+@Service(value = QueueReader.class)
+public class JobQueueConsumerFactory implements QueueReader, MessageFilter {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobQueueConsumerFactory.class);
+    private static final Set<JobUpdate.JobUpdateCommand> ALLOWED_COMMANDS = 
ImmutableSet.of(JobUpdate.JobUpdateCommand.UPDATE_JOB) ;
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private TopicManager topicManager;
+    @Reference
+    private QueueManager queueManager;
+
+    @Activate
+    public void activate(Map<String, Object> properties) {
+        if ( !(jobManager instanceof JobConsumer) ) {
+            LOGGER.error("JobManager must implement JobConsumer interface. {} 
does not. ", jobManager.getClass());
+            throw new IllegalStateException("JobManager does not implement 
JobConsumer");
+        }
+    }
+
+    @Deactivate
+    public void deactivate(@SuppressWarnings("UnusedParameters") Map<String, 
Object> properties) {
+    }
+
+
+
+
+    @Override
+    public void onMessage(Types.QueueName queueName, Map<String, Object> 
message) throws RequeueMessageException {
+
+        final Job job = new JobImpl(new JobUpdateImpl(message));
+
+
+        ((JobConsumer)jobManager).execute(job, new JobUpdateListener() {
+            @Override
+            public void update(@Nonnull JobUpdate update) {
+                if (update.getId() != job.getId() || 
!ALLOWED_COMMANDS.contains(update.getCommand())) {
+
+                    throw new IllegalArgumentException("Not allowed to update 
other jobs or issue reserved commands when updating the state of a running 
job.");
+                }
+                topicManager.publish(update.getQueue().asTopicName(), 
update.getCommand().asCommandName(), Utils.toMapValue(update));
+            }
+        }, new JobCallback() {
+            @Override
+            public void callback(Job finalJobState) {
+                if (finalJobState.getId() != job.getId()) {
+                    throw new IllegalArgumentException("Final Job state ID 
must match initial JobState ID");
+                }
+                JobUpdate finalJobUpdate = finalJobState.newJobUpdateBuilder()
+                        .command(JobUpdate.JobUpdateCommand.UPDATE_JOB)
+                        .putAll(finalJobState.getProperties())
+                        .build();
+                topicManager.publish(finalJobUpdate.getQueue().asTopicName(), 
finalJobUpdate.getCommand().asCommandName(), Utils.toMapValue(finalJobUpdate));
+            }
+        });
+
+
+    }
+
+    @Override
+    public boolean accept(Types.Name name, Map<String, Object> mapMessage) {
+        return !(jobManager instanceof MessageFilter) || ((MessageFilter) 
jobManager).accept(name, mapMessage);
+    }
+
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobQueueConsumerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.jobs.*;
+import org.apache.sling.jobs.impl.spi.JobStorage;
+import org.apache.sling.jobs.impl.storage.InMemoryJobStorage;
+import org.apache.sling.mom.QueueManager;
+import org.apache.sling.mom.TopicManager;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 11/04/2016.
+ * NB, this does *not* register as a JobConsumer service. it implements a 
JobConsumer so that it can consume Jobs from JobQueueConsumers.
+ */
+@Component(immediate = true)
+@Service(value =  JobManager.class)
+public class JobSubsystem  implements JobManager, JobConsumer {
+
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JobSubsystem.class);
+    private JobManagerImpl manager;
+    private JobStorage jobStorage;
+    private OutboundJobUpdateListener messageSender;
+
+    /**
+     * Contains a map of JobConsumers wrapped by JobConsumerHolders keyed by 
ServiceReference.
+     */
+    @Reference(referenceInterface = JobConsumer.class,
+            cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+            policy = ReferencePolicy.DYNAMIC,
+            bind="addConsumer",
+            unbind="removeConsumer")
+    private final Map<ServiceReference<JobConsumer>, JobConsumerHolder> 
registrations =
+            new ConcurrentHashMap<ServiceReference<JobConsumer>, 
JobConsumerHolder>();
+
+    @Reference
+    private TopicManager topicManager;
+    @Reference
+    private QueueManager queueManager;
+
+    @Activate
+    public synchronized void activate(@SuppressWarnings("UnusedParameters") 
Map<String, Object> properties) {
+        jobStorage = new InMemoryJobStorage();
+        messageSender = new OutboundJobUpdateListener(topicManager, 
queueManager);
+        manager = new JobManagerImpl(jobStorage, messageSender);
+    }
+
+    @Deactivate
+    public synchronized void deactivate(@SuppressWarnings("UnusedParameters") 
Map<String, Object> properties) {
+        for (Map.Entry<ServiceReference<JobConsumer>, JobConsumerHolder> e : 
registrations.entrySet()) {
+            e.getValue().close();
+        }
+        registrations.clear();
+        manager.dispose();
+        messageSender.dispose();
+        jobStorage.dispose();
+    }
+
+    // --- Job Manager.
+    @Nonnull
+    @Override
+    public JobBuilder newJobBuilder(@Nonnull Types.JobQueue queue, @Nonnull 
Types.JobType jobType) {
+        return manager.newJobBuilder(queue, jobType);
+    }
+
+    @Nullable
+    @Override
+    public Job getJobById(@Nonnull String jobId) {
+        return manager.getJobById(jobId);
+    }
+
+    @Nullable
+    @Override
+    public Job getJob(@Nonnull Types.JobQueue queue, @Nonnull Map<String, 
Object> template) {
+        return manager.getJob(queue, template);
+    }
+
+    @Nonnull
+    @Override
+    public Collection<Job> findJobs(@Nonnull QueryType type, @Nonnull 
Types.JobQueue queue, long limit, @Nullable Map<String, Object>... templates) {
+        return manager.findJobs(type, queue, limit, templates);
+    }
+
+    @Override
+    public void stopJobById(@Nonnull String jobId) {
+        manager.stopJobById(jobId);
+    }
+
+    @Override
+    public boolean abortJob(@Nonnull String jobId) {
+        return manager.abortJob(jobId);
+    }
+
+    @Nullable
+    @Override
+    public Job retryJobById(@Nonnull String jobId) {
+        return manager.retryJobById(jobId);
+    }
+
+
+    // ---- JobConsumer Registration
+    // Register Consumers using
+    public synchronized  void addConsumer(ServiceReference<JobConsumer> 
serviceRef) {
+        if (registrations.containsKey(serviceRef)) {
+            LOGGER.error("Registration for service reference is already 
present {}",serviceRef);
+            return;
+        }
+        JobConsumerHolder jobConsumerHolder = new 
JobConsumerHolder(serviceRef.getBundle().getBundleContext().getService(serviceRef),
 getServiceProperties(serviceRef));
+        registrations.put(serviceRef, jobConsumerHolder);
+    }
+
+    private Map<Object, Object> 
getServiceProperties(ServiceReference<JobConsumer> serviceRef) {
+        ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder();
+        for ( String k : serviceRef.getPropertyKeys()) {
+            builder.put(k, serviceRef.getProperty(k));
+        }
+        return builder.build();
+    }
+
+    public synchronized void removeConsumer(ServiceReference<JobConsumer> 
serviceRef) {
+        JobConsumerHolder jobConsumerHolder = registrations.remove(serviceRef);
+        if ( jobConsumerHolder != null) {
+            jobConsumerHolder.close();
+        }
+    }
+
+
+    // ------- job execution, invoked by JobQueueConsumerFactory.
+    @Nonnull
+    @Override
+    public void execute(@Nonnull Job initialState, @Nonnull JobUpdateListener 
listener, @Nonnull JobCallback callback) {
+        // iterate over the entries. This should cause the entries to come out 
in natural key order
+        // which should respect any priority applied to the Services via 
ServiceReference. (TODO: check that is the case)
+        // TODO: add a Job controller to the job before executing.
+        for (Map.Entry<ServiceReference<JobConsumer>,JobConsumerHolder> e : 
registrations.entrySet()) {
+            JobConsumerHolder jobConsumerHolder = e.getValue();
+            if (jobConsumerHolder.accept(initialState.getJobType())) {
+                jobConsumerHolder.consumer.execute(initialState, listener, 
callback);
+                return;
+            }
+        }
+        throw new IllegalArgumentException("No JobConsumer able to process a 
job of type "+initialState.getJobType()+" can be found in this instance.");
+    }
+
+
+    /**
+     * Holds job consumers and configures a JobTypeValve delegating to the 
JobConsumer implementation if it implements that interface.
+     */
+    private class JobConsumerHolder implements JobTypeValve, Closeable {
+        private final JobConsumer consumer;
+        private final Set<Types.JobType> jobTypes;
+
+        public JobConsumerHolder(JobConsumer consumer, Map<Object, Object> 
properties) {
+            this.consumer = consumer;
+            if ( consumer instanceof JobTypeValve) {
+                jobTypes = ImmutableSet.of();
+            } else {
+                jobTypes = Types.jobType((String[]) 
properties.get(JobConsumer.JOB_TYPES));
+            }
+        }
+
+        @Override
+        public boolean accept(@Nonnull Types.JobType jobType) {
+            if ( consumer instanceof JobTypeValve) {
+                return ((JobTypeValve) consumer).accept(jobType);
+            }
+            return jobTypes.contains(jobType);
+        }
+
+        public void close() {
+            // nothing to do at the moment.
+        }
+    }
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobSubsystem.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobUpdate;
+import org.apache.sling.jobs.JobUpdateBuilder;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Created by ieb on 23/03/2016.
+ */
+public class JobUpdateBuilderImpl implements JobUpdateBuilder {
+
+    private final String jobId;
+    private final Job job;
+    private JobUpdate.JobUpdateCommand command;
+    private final ImmutableMap.Builder<String, Object> updateProperties = 
ImmutableMap.builder();
+
+    /**
+     * Create a JobUpdateBuilder from a job.
+     * @param job
+     */
+    public JobUpdateBuilderImpl(@Nonnull Job job) {
+        this.job = job;
+        this.jobId = null;
+    }
+
+    public JobUpdateBuilderImpl(@Nonnull String jobId) {
+        this.jobId = jobId;
+        this.job = null;
+
+    }
+
+    /**
+     * Set the JobUpdateCommand
+     * @param command the command.
+     * @return this JobBuilder instance.
+     */
+    @Nonnull
+    @Override
+    public JobUpdateBuilder command(@Nonnull JobUpdate.JobUpdateCommand 
command) {
+        this.command = command;
+        return this;
+    }
+
+    /**
+     * Set a property to update.
+     * @param name the name of the property
+     * @param value the value of the property which may be null. To remove the 
property set the value to JobUpdate.JobPropertyAction.REMOVE.
+     * @return this JobBuilder instance.
+     */
+    @Nonnull
+    @Override
+    public JobUpdateBuilder put(@Nonnull String name, @Nullable Object value) {
+        if ( value == null) {
+            this.updateProperties.put(name, 
JobUpdate.JobPropertyAction.REMOVE);
+        } else {
+            this.updateProperties.put(name, value);
+        }
+        return this;
+    }
+
+    @Nonnull
+    @Override
+    public JobUpdateBuilder putAll(@Nonnull Map<String, Object> properties) {
+        this.updateProperties.putAll(properties);
+        return this;
+    }
+
+
+    /**
+     * Build the JobUpdate.
+     * @return the JobUpdate.
+     */
+    @Nonnull
+    @Override
+    public JobUpdate build() {
+        if ( job != null) {
+            return new JobUpdateImpl(job, command, updateProperties.build());
+        } else if ( command == JobUpdate.JobUpdateCommand.ABORT_JOB || command 
== JobUpdate.JobUpdateCommand.STOP_JOB) {
+            return new JobUpdateImpl(jobId, command);
+        } else {
+            throw new IllegalStateException("Only possible to abort or stop a 
job by ID alone ");
+        }
+    }
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateBuilderImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobUpdate;
+import org.apache.sling.jobs.Types;
+import org.apache.sling.jobs.impl.spi.MapValueAdapter;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents messages sent to the Job via a message queue.
+ * Abort, stop and update messages should be sent via a priority queue.
+ * Start messages should be sent by a processing queue.
+ * Created by ieb on 23/03/2016.
+ */
+public class JobUpdateImpl implements MapValueAdapter,  JobUpdate {
+    private static final long TTL = 1000 * 60;
+    private long updateTimestamp;
+    private long expires;
+    private JobUpdateCommand command;
+    private Types.JobQueue jobQueue;
+    private String id;
+    private Job.JobState jobState;
+    private Map<String, Object> properties;
+    private int retryCount;
+    private int numberOfRetries;
+    private long startedAt;
+    private long createdAt;
+    private long finishedAt;
+    private String resultMessage;
+    private Types.JobType jobType;
+
+
+    /**
+     * Create an update message using a job, command and update properties. 
Only the update properties will in the update.
+     * The job will be used to specify the job jobQueue, job ID and job state 
of the update message.
+     * @param job the job
+     * @param command the command
+     * @param properties properties in the update message.
+     */
+    public JobUpdateImpl(@Nonnull Job job, @Nonnull JobUpdateCommand command, 
@Nonnull Map<String, Object> properties) {
+        Preconditions.checkNotNull(job, "Job argument cant be null");
+        Preconditions.checkNotNull(command, "JobCommand argument cant be 
null");
+        Preconditions.checkNotNull(properties, "Map of properties cant be 
null");
+
+        jobQueue = job.getQueue();
+        jobType = job.getJobType();
+        id = job.getId();
+        startedAt = job.getStarted();
+        createdAt = job.getCreated();
+        finishedAt = job.getFinished();
+        retryCount = job.getRetryCount();
+        jobState = job.getJobState();
+        resultMessage = job.getResultMessage();
+        numberOfRetries = job.getNumberOfRetries();
+        updateTimestamp = System.currentTimeMillis();
+        expires = updateTimestamp + TTL;
+        this.command = command;
+        this.properties = properties;
+    }
+
+    /**
+     * Create a JobUpdateImpl based on a inbound message in the form of a Map.
+     * @param message a inbound message in map form.
+     */
+    public JobUpdateImpl(@Nonnull Map<String, Object> message) {
+        Preconditions.checkNotNull(message, "Message cant be null");
+        fromMapValue(message);
+    }
+
+    public JobUpdateImpl(@Nonnull String jobId, @Nonnull JobUpdateCommand 
command) {
+        Preconditions.checkNotNull(jobId, "JobId argument cant be null");
+        Preconditions.checkNotNull(command, "JobUpdateCommand argument cant be 
null");
+        jobQueue = Types.ANY_JOB_QUEUE;
+        id = jobId;
+        updateTimestamp = System.currentTimeMillis();
+        expires = updateTimestamp + TTL;
+        jobState = Job.JobState.ANY_STATE;
+        this.command = command;
+        this.properties = ImmutableMap.of();
+
+    }
+
+
+    @Override
+    public long updateTimestamp() {
+        return updateTimestamp;
+    }
+
+    @Override
+    public long expires() {
+        return expires;
+    }
+
+    @Nonnull
+    @Override
+    public Types.JobType getJobType() {
+        return jobType;
+    }
+
+    @Nonnull
+    @Override
+    public JobUpdateCommand getCommand() {
+        return command;
+    }
+
+    @Nonnull
+    @Override
+    public Types.JobQueue getQueue() {
+        return jobQueue;
+    }
+
+    @Nonnull
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Nonnull
+    @Override
+    public Job.JobState getState() {
+        return jobState;
+    }
+
+    @Nonnull
+    @Override
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return numberOfRetries;
+    }
+
+    @Override
+    public long getStarted() {
+        return startedAt;
+    }
+
+    @Override
+    public long getCreated() {
+        return createdAt;
+    }
+
+    @Override
+    public long getFinished() {
+        return finishedAt;
+    }
+
+    @Override
+    public String getResultMessage() {
+        return resultMessage;
+    }
+
+
+    @Override
+    public void fromMapValue(@Nullable Object mapValue) {
+        if (mapValue != null && mapValue instanceof Map) {
+            @SuppressWarnings("unchecked") Map<String, Object> m = 
(Map<String, Object>) mapValue;
+            jobQueue = Types.jobQueue((String) Utils.getRequired(m, "tp"));
+            jobType = Types.jobType((String)Utils.getRequired(m, "jt"));
+            id = Utils.getRequired(m, "id");
+            command = JobUpdateCommand.valueOf((String) Utils.getRequired(m, 
"cm"));
+            updateTimestamp = Utils.getRequired(m, "ts");
+            expires =  Utils.getRequired(m, "ex");
+            if (command == JobUpdateCommand.UPDATE_JOB || command == 
JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) {
+                startedAt = Utils.getOptional(m, "startedAt", 0L);
+                createdAt = Utils.getOptional(m, "createdAt", 0L);
+                finishedAt = Utils.getOptional(m, "finishedAt", 0L);
+                retryCount = Utils.getOptional(m, "retryCount", 0);
+                numberOfRetries = Utils.getOptional(m, "nRetries", 10);
+                jobState = Job.JobState.valueOf(Utils.getOptional(m, 
"jobState", Job.JobState.QUEUED.toString()));
+                resultMessage = Utils.getOptional(m, "resultMessage", null);
+                properties = Utils.getOptional(m, "properties", new 
HashMap<String, Object>());
+            } else {
+                properties = new HashMap<String, Object>();
+            }
+        } else {
+            throw new IllegalArgumentException("Cant populate JobImpl from 
"+mapValue);
+        }
+    }
+
+    @Override
+    @Nonnull
+    public Object toMapValue() {
+        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+        builder.put("tp", jobQueue.toString());
+        builder.put("jt",jobType.toString());
+        builder.put("id",id);
+        builder.put("cm", command.toString());
+        builder.put("ts", this.updateTimestamp);
+        builder.put("ex", expires);
+        if ( command == JobUpdateCommand.UPDATE_JOB || command == 
JobUpdateCommand.START_JOB || command == JobUpdateCommand.RETRY_JOB ) {
+            builder.put("retryCount", retryCount);
+            builder.put("nRetries", numberOfRetries);
+            builder.put("startedAt", startedAt);
+            builder.put("createdAt", createdAt);
+            builder.put("finishedAt", finishedAt);
+            builder.put("jobState", jobState.toString());
+            builder.put("resultMessage", resultMessage);
+            builder.put("properties", 
ImmutableMap.builder().putAll(properties).build());
+
+        }
+        return builder.build();
+    }
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/JobUpdateImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.sling.jobs.JobManager;
+import org.apache.sling.jobs.JobUpdateListener;
+import org.apache.sling.mom.Subscriber;
+import org.apache.sling.mom.Types;
+
+import java.util.Map;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * Listens to a topic to retrieve control messages.
+ */
+@Component(immediate = true, metatype = true)
+@Service(value = Subscriber.class)
+@Properties({
+        @Property(name= Subscriber.TOPIC_NAMES_PROP, cardinality = 
Integer.MAX_VALUE, value = {"sling/jobupdates"} )
+})
+public class ManagerSubscriber implements Subscriber {
+
+
+    @Reference
+    private JobManager jobManager;
+
+
+    @Override
+    public void onMessage(Types.TopicName topic, Map<String, Object> message) {
+        if (jobManager instanceof JobUpdateListener) {
+            ((JobUpdateListener) jobManager).update(new 
JobUpdateImpl(message));
+        }
+    }
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/ManagerSubscriber.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.impl;
+
+import org.apache.sling.jobs.JobUpdate;
+import org.apache.sling.jobs.JobUpdateListener;
+import org.apache.sling.jobs.Types;
+import org.apache.sling.mom.QueueManager;
+import org.apache.sling.mom.TopicManager;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Created by ieb on 30/03/2016.
+ * Sends messages out to JMS Queues or topics. Normally called by the local 
JobManager Implementation.
+ * Uses a TopicManager or QueueManager to perform the send operation.
+ */
+public class OutboundJobUpdateListener implements JobUpdateListener {
+
+
+    private boolean active;
+    private final TopicManager topicManager;
+    private final QueueManager queueManager;
+
+    public OutboundJobUpdateListener(TopicManager topicManager, QueueManager 
queueManager ) {
+        this.topicManager = topicManager;
+        this.queueManager = queueManager;
+        active = true;
+    }
+    
+    public void dispose() {
+        active = false;
+    }
+
+
+    @Override
+    public void update(@Nonnull JobUpdate update) {
+        if ( active ) {
+            switch(update.getCommand()) {
+                case START_JOB:
+                    queueManager.add(update.getQueue().asQueueName(), 
Utils.toMapValue(update));
+                    break;
+                default:
+                    topicManager.publish(update.getQueue().asTopicName(), 
update.getCommand().asCommandName(), Utils.toMapValue(update));
+                    break;
+            }
+        }
+    }
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/OutboundJobUpdateListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import org.apache.sling.jobs.impl.spi.MapValueAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.nio.charset.Charset;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by ieb on 29/03/2016.
+ */
+public class Utils {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+    private static final Charset UTF8 = Charset.forName("UTF-8") ;
+    private static final String PROCESS_NAME = generateUniqueNamespace();
+    // Set the counter to the class load time.
+    private static final AtomicLong idCounter = new 
AtomicLong(System.currentTimeMillis());
+
+    /**
+     * Gets a string
+     * @return
+     */
+    @Nonnull
+    private static String generateUniqueNamespace() {
+        String macAddress = null;
+        // get the MAC address of the primary interface, failing that use a 
fake.
+        try {
+            for ( Enumeration<NetworkInterface> netInterfaceE = 
NetworkInterface.getNetworkInterfaces(); netInterfaceE.hasMoreElements();) {
+                NetworkInterface netInterface = netInterfaceE.nextElement();
+                byte[] hw = netInterface.getHardwareAddress();
+                if ( !netInterface.isLoopback() && !netInterface.isVirtual() 
&& hw != null) {
+                    macAddress = tohex(hw);
+                    LOGGER.info("Job IDs seeded with MAC Address from 
interface {} ", netInterface);
+                    break;
+                }
+            }
+            if ( macAddress == null) {
+                LOGGER.info("No MAC address available, seeding JobID from 
startup time.");
+                macAddress = "fake-" + System.currentTimeMillis();
+            }
+        } catch (SocketException e) {
+            LOGGER.warn("Unable to get MAC address, defaulting to fake ", e);
+        }
+        long processID;
+        try {
+            // most JVMs.
+            processID = 
Long.parseLong(java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
+        } catch (Exception e) {
+            try {
+                // most Linux kernels.
+                processID = Long.parseLong(new 
File("/proc/self").getCanonicalFile().getName());
+            } catch (Exception e1) {
+                LOGGER.warn("Unable to get ProcessID by  address, defaulting 
to fake ", e);
+                processID = System.currentTimeMillis();  // this will be way 
beyond any process ID.
+            }
+        }
+        String baseId = macAddress + "/" + processID+ "/";
+        LOGGER.info("Job IDS base is {} ", baseId);
+        return  baseId;
+    }
+
+    @Nonnull
+    private static String tohex(@Nonnull byte[] bytes) {
+        StringBuilder sb = new StringBuilder();
+        for( byte b : bytes) {
+            sb.append(String.format("%02x",b));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Generate an ID based on the unique name of the jvm process and a 
counter.
+     * @return
+     */
+    @Nonnull
+    public static String generateId() {
+        try {
+            return 
Utils.tohex(MessageDigest.getInstance("SHA1").digest((Utils.PROCESS_NAME+idCounter.incrementAndGet()).getBytes(UTF8)));
+        } catch (NoSuchAlgorithmException nsae) {
+            throw new RuntimeException("SHA1 not supported", nsae);
+        }
+    }
+
+    @Nonnull
+    public static Map<String, Object> toMapValue(@Nonnull Object msg) {
+        if (msg instanceof Map) {
+            //noinspection unchecked
+            return (Map<String, Object>) msg;
+        } else if (msg instanceof MapValueAdapter) {
+            return toMapValue(((MapValueAdapter) msg).toMapValue());
+        }
+        throw new IllegalArgumentException("Unable to convert 
"+msg.getClass()+" to a Map.");
+    }
+
+
+    @Nonnull
+    public static <T> T getRequired(@Nonnull Map<String, Object> m, @Nonnull 
String name) {
+        if (m.containsKey(name)) {
+            //noinspection unchecked
+            if ( m.get(name) != null) {
+                return (T) m.get(name);
+            }
+        }
+        throw new IllegalArgumentException("Required key "+name+" is missing 
from "+m);
+    }
+
+    @Nullable
+    public static <T> T getOptional(@Nonnull Map<String, Object> m, @Nonnull 
String name, @Nullable T defaultValue) {
+        if (m.containsKey(name)) {
+            //noinspection unchecked
+            Object o = m.get(name);
+            if ( defaultValue instanceof Integer && o instanceof Long) {
+                return (T)(Integer) ((Long) o).intValue();
+            } else if ( defaultValue instanceof Float && o instanceof Double) {
+                return (T)(Float) ((Double) o).floatValue();
+            }
+            return (T) o;
+        }
+        return defaultValue;
+    }
+
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/Utils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl.spi;
+
+import org.apache.sling.jobs.Job;
+
+/**
+ * Created by ieb on 29/03/2016.
+ * Starts a job, used by the JobBuilderImpl to perform the add operation.
+ */
+public interface JobStarter {
+    Job start(Job j);
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStarter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.jobs.impl.spi;
+
+import org.apache.sling.jobs.Job;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/**
+ * Created by ieb on 29/03/2016.
+ * Provides JobStorage local to the JVM. Implementation may or may not decide 
to persist over restarts, page, etc
+ */
+public interface JobStorage {
+
+
+    /**
+     * Get a Job by ID.
+     * @param jobId the job ID to get.
+     * @return the job or null of the job doesn't exist.
+     */
+    @Nullable
+    Job get(@Nonnull String jobId);
+
+    /**
+     * Put a Job into the Job Storage.
+     * @param job the job.
+     * @return the job just added.
+     */
+    @Nonnull
+    Job put(@Nonnull Job job);
+
+    /**
+     * Remove the Job
+     * @param jobId
+     * @return the job removed or null if not present.
+     */
+    @Nullable
+    Job remove(@Nonnull String jobId);
+
+    /**
+     * Remove the Job, returning the job removed.
+     * @param job the job to remove.
+     * @return the job removed, if the the job was present, otherwise null.
+     */
+    @Nullable
+    Job remove(@Nonnull Job job);
+
+    /**
+     * Dispose of the JobStorage.
+     */
+    void dispose();
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/JobStorage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl.spi;
+
+/**
+ * Objects that can be converted to and from a Map are expected to extend this 
base class.
+ * The values contained in the Map are expected to be Maps or values that can 
be serialised into most
+ * common formats. It would be safe to use json or yaml as an example of a 
common format.
+ * Created by ieb on 28/03/2016.
+ *
+ */
+public interface MapValueAdapter {
+
+
+    /**
+     * Populate the object from a map value.
+     * @param mapValue
+     */
+    void fromMapValue(Object mapValue);
+
+    /**
+     * Adapt the object into a value suitable for use in a map to be 
serialised by standard map -> json,yaml writers.
+     * @return a value, which may be a primitive, an array or a map of 
primitives.
+     */
+    Object toMapValue();
+
+
+
+
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/spi/MapValueAdapter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl.storage;
+
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.impl.spi.JobStorage;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by ieb on 29/03/2016.
+ * An unbounded local JVM job store.
+ */
+public class InMemoryJobStorage implements JobStorage {
+
+
+    private Map<String, Job> store = new ConcurrentHashMap<String, Job>();
+
+    @Nullable
+    @Override
+    public Job get(@Nonnull String jobId) {
+        check();
+        return store.get(jobId);
+    }
+
+    @Nonnull
+    @Override
+    public Job put(@Nonnull Job job) {
+        check();
+        store.put(job.getId(), job);
+        return job;
+    }
+
+    @Nullable
+    @Override
+    public Job remove(@Nonnull String jobId) {
+        check();
+        Job j = store.get(jobId);
+        store.remove(jobId);
+        return j;
+    }
+
+    @Nullable
+    @Override
+    public Job remove(@Nonnull Job job) {
+        check();
+        return remove(job.getId());
+    }
+
+    private void check() {
+        if ( store == null) {
+            throw new IllegalStateException("Job store already closed.");
+        }
+    }
+
+    @Override
+    public void dispose() {
+        store.clear();
+        store = null;
+    }
+}

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/impl/storage/InMemoryJobStorage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@Version("1.0.0")
+package org.apache.sling.jobs;
+
+import aQute.bnd.annotation.Version;
+

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/main/java/org/apache/sling/jobs/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java?rev=1754255&view=auto
==============================================================================
--- 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java
 (added)
+++ 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java
 Wed Jul 27 12:10:12 2016
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.jobs.impl;
+
+import org.apache.sling.jobs.Job;
+import org.apache.sling.jobs.JobController;
+import org.apache.sling.jobs.Types;
+import org.apache.sling.jobs.impl.spi.JobStarter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by ieb on 05/04/2016.
+ * Tests job builder.
+ */
+public class JobBuilderImplTest {
+
+    @Mock
+    private JobStarter jobStarter;
+
+    private Queue<Job> queue;
+    @Mock
+    private JobController jobController;
+
+    public JobBuilderImplTest() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Before
+    public void before() {
+        queue = new ArrayBlockingQueue<Job>(1);
+        Mockito.when(jobStarter.start(Mockito.any(Job.class))).then(new 
Answer<Job>() {
+            @Override
+            public Job answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                queue.add((Job) invocationOnMock.getArguments()[0]);
+                return (Job) invocationOnMock.getArguments()[0];
+            }
+        });
+    }
+
+    @Test
+    public void testAddJob() {
+        long start = System.currentTimeMillis();
+        Map<String, Object> testMap = new HashMap<String, Object>();
+        testMap.put("job.name", "Jobname");
+        Job queuedJob = new JobBuilderImpl(jobStarter, 
Types.jobQueue("testtopic"), 
Types.jobType("testtype")).addProperties(testMap).add();
+        assertEquals(1, queue.size());
+        Job fromQueue = queue.remove();
+        assertEquals(queuedJob, fromQueue);
+        assertEquals(Types.jobQueue("testtopic"), fromQueue.getQueue());
+        assertEquals("Jobname", fromQueue.getProperties().get("job.name"));
+        assertNotNull(fromQueue.getId());
+        long now = System.currentTimeMillis();
+        assertTrue(fromQueue.getCreated() >= start);
+        assertTrue(fromQueue.getCreated() <= now);
+        assertEquals(Job.JobState.CREATED, fromQueue.getJobState());
+        assertNull(fromQueue.getController());
+        fromQueue.setJobController(jobController);
+        assertEquals(jobController, fromQueue.getController());
+        fromQueue.removeJobController();
+        assertNull(fromQueue.getController());
+        assertEquals("", fromQueue.getResultMessage());
+        assertEquals(0, fromQueue.getFinished());
+        assertEquals(0, fromQueue.getStarted());
+        assertEquals(0, fromQueue.getNumberOfRetries());
+        assertEquals(0, fromQueue.getRetryCount());
+
+    }
+
+}
\ No newline at end of file

Propchange: 
sling/trunk/contrib/commons/mom/examples/jobs/core/src/test/java/org/apache/sling/jobs/impl/JobBuilderImplTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to