http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
new file mode 100644
index 0000000..d73a09b
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.attribute.expression.language.Query.Range;
+import 
org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.expression.ExpressionLanguageCompiler;
+
+public class MockValidationContext implements ValidationContext, 
ControllerServiceLookup {
+
+    private final MockProcessContext context;
+    private final Map<String, Boolean> expressionLanguageSupported;
+
+    public MockValidationContext(final MockProcessContext processContext) {
+        this.context = processContext;
+
+        final Map<PropertyDescriptor, String> properties = 
processContext.getProperties();
+        expressionLanguageSupported = new HashMap<>(properties.size());
+        for (final PropertyDescriptor descriptor : properties.keySet()) {
+            expressionLanguageSupported.put(descriptor.getName(), 
descriptor.isExpressionLanguageSupported());
+        }
+    }
+
+    @Override
+    public ControllerService getControllerService(final String identifier) {
+        return context.getControllerService(identifier);
+    }
+
+    @Override
+    public PropertyValue newPropertyValue(final String rawValue) {
+        return new MockPropertyValue(rawValue, this);
+    }
+
+    @Override
+    public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
+        return new StandardExpressionLanguageCompiler();
+    }
+
+    @Override
+    public ValidationContext getControllerServiceValidationContext(final 
ControllerService controllerService) {
+        final MockProcessContext serviceProcessContext = new 
MockProcessContext(controllerService, context);
+        return new MockValidationContext(serviceProcessContext);
+    }
+
+    @Override
+    public PropertyValue getProperty(final PropertyDescriptor property) {
+        return context.getProperty(property);
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getProperties() {
+        return context.getProperties();
+    }
+
+    @Override
+    public String getAnnotationData() {
+        return context.getAnnotationData();
+    }
+
+    @Override
+    public Set<String> getControllerServiceIdentifiers(Class<? extends 
ControllerService> serviceType) {
+        return context.getControllerServiceIdentifiers(serviceType);
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return this;
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+        return context.isControllerServiceEnabled(serviceIdentifier);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final ControllerService service) 
{
+        return context.isControllerServiceEnabled(service);
+    }
+
+    @Override
+    public String getControllerServiceName(final String serviceIdentifier) {
+        final ControllerServiceConfiguration configuration = 
context.getConfiguration(serviceIdentifier);
+        return configuration == null ? null : serviceIdentifier;
+    }
+
+    @Override
+    public boolean isValidationRequired(final ControllerService service) {
+        return true;
+    }
+
+    @Override
+    public boolean isControllerServiceEnabling(String serviceIdentifier) {
+        return context.isControllerServiceEnabling(serviceIdentifier);
+    }
+
+    public boolean isExpressionLanguagePresent(final String value) {
+        if (value == null) {
+            return false;
+        }
+
+        final List<Range> elRanges = Query.extractExpressionRanges(value);
+        return (elRanges != null && !elRanges.isEmpty());
+    }
+
+    @Override
+    public boolean isExpressionLanguageSupported(final String propertyName) {
+        final Boolean supported = 
expressionLanguageSupported.get(propertyName);
+        return Boolean.TRUE.equals(supported);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
new file mode 100644
index 0000000..940eeea
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReflectionUtils {
+
+    private final static Logger LOG = 
LoggerFactory.getLogger(ReflectionUtils.class);
+
+    /**
+     * Invokes all methods on the given instance that have been annotated with
+     * the given Annotation. If the signature of the method that is defined in
+     * <code>instance</code> uses 1 or more parameters, those parameters must 
be
+     * specified by the <code>args</code> parameter. However, if more arguments
+     * are supplied by the <code>args</code> parameter than needed, the extra
+     * arguments will be ignored.
+     *
+     * @param annotation the annotation to look for
+     * @param instance to invoke a method of
+     * @param args to supply in a method call
+     * @throws InvocationTargetException ite
+     * @throws IllegalArgumentException iae
+     * @throws IllegalAccessException if not allowed to invoke that method
+     */
+    public static void invokeMethodsWithAnnotation(final Class<? extends 
Annotation> annotation, final Object instance, final Object... args)
+            throws IllegalAccessException, IllegalArgumentException, 
InvocationTargetException {
+        for (final Method method : instance.getClass().getMethods()) {
+            if (method.isAnnotationPresent(annotation)) {
+                final boolean isAccessible = method.isAccessible();
+                method.setAccessible(true);
+
+                try {
+                    final Class<?>[] argumentTypes = 
method.getParameterTypes();
+                    if (argumentTypes.length > args.length) {
+                        throw new 
IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s 
because method expects %3$s parameters but only %4$s were given",
+                                method.getName(), instance, 
argumentTypes.length, args.length));
+                    }
+
+                    for (int i = 0; i < argumentTypes.length; i++) {
+                        final Class<?> argType = argumentTypes[i];
+                        if (!argType.isAssignableFrom(args[i].getClass())) {
+                            throw new IllegalArgumentException(String.format(
+                                    "Unable to invoke method %1$s on %2$s 
because method parameter %3$s is expected to be of type %4$s but argument 
passed was of type %5$s",
+                                    method.getName(), instance, i, argType, 
args[i].getClass()));
+                        }
+                    }
+
+                    if (argumentTypes.length == args.length) {
+                        method.invoke(instance, args);
+                    } else {
+                        final Object[] argsToPass = new 
Object[argumentTypes.length];
+                        for (int i = 0; i < argsToPass.length; i++) {
+                            argsToPass[i] = args[i];
+                        }
+
+                        method.invoke(instance, argsToPass);
+                    }
+                } finally {
+                    if (!isAccessible) {
+                        method.setAccessible(false);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Invokes all methods on the given instance that have been annotated with
+     * the given Annotation. If the signature of the method that is defined in
+     * <code>instance</code> uses 1 or more parameters, those parameters must 
be
+     * specified by the <code>args</code> parameter. However, if more arguments
+     * are supplied by the <code>args</code> parameter than needed, the extra
+     * arguments will be ignored.
+     *
+     * @param annotation the annotation to look for
+     * @param instance to invoke a method of
+     * @param args to supply in a method call
+     * @return <code>true</code> if all appropriate methods were invoked and
+     * returned without throwing an Exception, <code>false</code> if one of the
+     * methods threw an Exception or could not be invoked; if 
<code>false</code>
+     * is returned, an error will have been logged.
+     */
+    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? 
extends Annotation> annotation, final Object instance, final Object... args) {
+        for (final Method method : instance.getClass().getMethods()) {
+            if (method.isAnnotationPresent(annotation)) {
+                final boolean isAccessible = method.isAccessible();
+                method.setAccessible(true);
+
+                try {
+                    final Class<?>[] argumentTypes = 
method.getParameterTypes();
+                    if (argumentTypes.length > args.length) {
+                        LOG.error("Unable to invoke method {} on {} because 
method expects {} parameters but only {} were given",
+                                new Object[]{method.getName(), instance, 
argumentTypes.length, args.length});
+                        return false;
+                    }
+
+                    for (int i = 0; i < argumentTypes.length; i++) {
+                        final Class<?> argType = argumentTypes[i];
+                        if (!argType.isAssignableFrom(args[i].getClass())) {
+                            LOG.error("Unable to invoke method {} on {} 
because method parameter {} is expected to be of type {} but argument passed 
was of type {}",
+                                    new Object[]{method.getName(), instance, 
i, argType, args[i].getClass()});
+                            return false;
+                        }
+                    }
+
+                    try {
+                        if (argumentTypes.length == args.length) {
+                            method.invoke(instance, args);
+                        } else {
+                            final Object[] argsToPass = new 
Object[argumentTypes.length];
+                            for (int i = 0; i < argsToPass.length; i++) {
+                                argsToPass[i] = args[i];
+                            }
+
+                            method.invoke(instance, argsToPass);
+                        }
+                    } catch (final IllegalAccessException | 
IllegalArgumentException | InvocationTargetException t) {
+                        LOG.error("Unable to invoke method {} on {} due to 
{}", new Object[]{method.getName(), instance, t});
+                        LOG.error("", t);
+                        return false;
+                    }
+                } finally {
+                    if (!isAccessible) {
+                        method.setAccessible(false);
+                    }
+                }
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
new file mode 100644
index 0000000..994735b
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceReporter;
+
+public class SharedSessionState {
+
+    private final MockFlowFileQueue flowFileQueue;
+    private final ProvenanceReporter provenanceReporter;
+    @SuppressWarnings("unused")
+    private final Processor processor;
+    private final AtomicLong flowFileIdGenerator;
+    private final ConcurrentMap<String, AtomicLong> counterMap = new 
ConcurrentHashMap<>();
+    private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>();
+
+    public SharedSessionState(final Processor processor, final AtomicLong 
flowFileIdGenerator) {
+        flowFileQueue = new MockFlowFileQueue();
+        provenanceReporter = new MockProvenanceReporter(null, this, 
UUID.randomUUID().toString(), "N/A");
+        this.flowFileIdGenerator = flowFileIdGenerator;
+        this.processor = processor;
+    }
+
+    void addProvenanceEvents(final Collection<ProvenanceEventRecord> events) {
+        this.events.addAll(events);
+    }
+
+    void clearProvenanceEvents() {
+        this.events.clear();
+    }
+
+    public List<ProvenanceEventRecord> getProvenanceEvents() {
+        return new ArrayList<>(this.events);
+    }
+
+    public MockFlowFileQueue getFlowFileQueue() {
+        return flowFileQueue;
+    }
+
+    public ProvenanceReporter getProvenanceReporter() {
+        return provenanceReporter;
+    }
+
+    public long nextFlowFileId() {
+        return flowFileIdGenerator.getAndIncrement();
+    }
+
+    public void adjustCounter(final String name, final long delta) {
+        AtomicLong counter = counterMap.get(name);
+        if (counter == null) {
+            counter = new AtomicLong(0L);
+            final AtomicLong existingCounter = counterMap.putIfAbsent(name, 
counter);
+            if (existingCounter != null) {
+                counter = existingCounter;
+            }
+        }
+
+        counter.addAndGet(delta);
+    }
+
+    public Long getCounterValue(final String name) {
+        final AtomicLong counterValue = counterMap.get(name);
+        return counterValue == null ? null : counterValue.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java
new file mode 100644
index 0000000..ef518dc
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+
+public class SingleSessionFactory implements ProcessSessionFactory {
+
+    private final MockProcessSession session;
+
+    public SingleSessionFactory(final MockProcessSession session) {
+        this.session = session;
+    }
+
+    @Override
+    public ProcessSession createSession() {
+        return session;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
new file mode 100644
index 0000000..048e2b9
--- /dev/null
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Assert;
+
+public class StandardProcessorTestRunner implements TestRunner {
+
+    private final Processor processor;
+    private final MockProcessContext context;
+    private final MockFlowFileQueue flowFileQueue;
+    private final MockSessionFactory sessionFactory;
+    private final SharedSessionState sharedState;
+    private final AtomicLong idGenerator;
+    private final boolean triggerSerially;
+
+    private int numThreads = 1;
+    private final AtomicInteger invocations = new AtomicInteger(0);
+
+    private static final Set<Class<? extends Annotation>> 
deprecatedTypeAnnotations = new HashSet<>();
+    private static final Set<Class<? extends Annotation>> 
deprecatedMethodAnnotations = new HashSet<>();
+
+    static {
+        // do this in a separate method, just so that we can add a 
@SuppressWarnings annotation
+        // because we want to indicate explicitly that we know that we are 
using deprecated
+        // classes here.
+        populateDeprecatedMethods();
+    }
+
+    StandardProcessorTestRunner(final Processor processor) {
+        this.processor = processor;
+        this.idGenerator = new AtomicLong(0L);
+        this.sharedState = new SharedSessionState(processor, idGenerator);
+        this.flowFileQueue = sharedState.getFlowFileQueue();
+        this.sessionFactory = new MockSessionFactory(sharedState, processor);
+        this.context = new MockProcessContext(processor);
+
+        detectDeprecatedAnnotations(processor);
+
+        final MockProcessorInitializationContext mockInitContext = new 
MockProcessorInitializationContext(processor, context);
+        processor.initialize(mockInitContext);
+
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
processor);
+        } catch (final Exception e) {
+            Assert.fail("Could not invoke methods annotated with @OnAdded 
annotation due to: " + e);
+        }
+
+        triggerSerially = null != 
processor.getClass().getAnnotation(TriggerSerially.class);
+    }
+
+    @SuppressWarnings("deprecation")
+    private static void populateDeprecatedMethods() {
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.CapabilityDescription.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.EventDriven.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SideEffectFree.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SupportsBatching.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.Tags.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class);
+        
deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerSerially.class);
+
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnRemoved.class);
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnAdded.class);
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnScheduled.class);
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnShutdown.class);
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnStopped.class);
+        
deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnUnscheduled.class);
+    }
+
+    private static void detectDeprecatedAnnotations(final Processor processor) 
{
+        for (final Class<? extends Annotation> annotationClass : 
deprecatedTypeAnnotations) {
+            if (processor.getClass().isAnnotationPresent(annotationClass)) {
+                Assert.fail("Processor is using deprecated Annotation " + 
annotationClass.getCanonicalName());
+            }
+        }
+
+        for (final Class<? extends Annotation> annotationClass : 
deprecatedMethodAnnotations) {
+            for (final Method method : processor.getClass().getMethods()) {
+                if (method.isAnnotationPresent(annotationClass)) {
+                    Assert.fail("Processor is using deprecated Annotation " + 
annotationClass.getCanonicalName() + " for method " + method);
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void setValidateExpressionUsage(final boolean validate) {
+        context.setValidateExpressionUsage(validate);
+    }
+
+    @Override
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    @Override
+    public MockProcessContext getProcessContext() {
+        return context;
+    }
+
+    @Override
+    public void run() {
+        run(1);
+    }
+
+    @Override
+    public void run(int iterations) {
+        run(iterations, true);
+    }
+
+    @Override
+    public void run(final int iterations, final boolean stopOnFinish) {
+        run(iterations, stopOnFinish, true);
+    }
+
+    @Override
+    public void run(final int iterations, final boolean stopOnFinish, final 
boolean initialize) {
+        run(iterations, stopOnFinish, initialize, 5000);
+    }
+
+    @Override
+    public void run(final int iterations, final boolean stopOnFinish, final 
boolean initialize, final long runWait) {
+        if (iterations < 1) {
+            throw new IllegalArgumentException();
+        }
+
+        context.assertValid();
+        context.enableExpressionValidation();
+        try {
+            if (initialize) {
+                try {
+                    
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, 
context);
+                } catch (final Exception e) {
+                    e.printStackTrace();
+                    Assert.fail("Could not invoke methods annotated with 
@OnScheduled annotation due to: " + e);
+                }
+            }
+
+            final ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+            @SuppressWarnings("unchecked")
+            final Future<Throwable>[] futures = new Future[iterations];
+            for (int i = 0; i < iterations; i++) {
+                final Future<Throwable> future = executorService.submit(new 
RunProcessor());
+                futures[i] = future;
+            }
+
+            executorService.shutdown();
+            try {
+                executorService.awaitTermination(runWait, 
TimeUnit.MILLISECONDS);
+            } catch (final InterruptedException e1) {
+            }
+
+            int finishedCount = 0;
+            boolean unscheduledRun = false;
+            for (final Future<Throwable> future : futures) {
+                try {
+                    final Throwable thrown = future.get();   // wait for the 
result
+                    if (thrown != null) {
+                        throw new AssertionError(thrown);
+                    }
+
+                    if (++finishedCount == 1) {
+                        unscheduledRun = true;
+                        try {
+                            
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, 
context);
+                        } catch (final Exception e) {
+                            Assert.fail("Could not invoke methods annotated 
with @OnUnscheduled annotation due to: " + e);
+                        }
+                    }
+                } catch (final Exception e) {
+                }
+            }
+
+            if (!unscheduledRun) {
+                try {
+                    
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, 
context);
+                } catch (final Exception e) {
+                    Assert.fail("Could not invoke methods annotated with 
@OnUnscheduled annotation due to: " + e);
+                }
+            }
+
+            if (stopOnFinish) {
+                try {
+                    
ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor);
+                } catch (final Exception e) {
+                    Assert.fail("Could not invoke methods annotated with 
@OnStopped annotation due to: " + e);
+                }
+            }
+        } finally {
+            context.disableExpressionValidation();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, 
processor);
+        } catch (final Exception e) {
+            Assert.fail("Could not invoke methods annotated with @OnShutdown 
annotation due to: " + e);
+        }
+    }
+
+    private class RunProcessor implements Callable<Throwable> {
+
+        @Override
+        public Throwable call() throws Exception {
+            invocations.incrementAndGet();
+            try {
+                processor.onTrigger(context, sessionFactory);
+            } catch (final Throwable t) {
+                return t;
+            }
+
+            return null;
+        }
+    }
+
+    @Override
+    public ProcessSessionFactory getProcessSessionFactory() {
+        return sessionFactory;
+    }
+
+    @Override
+    public void assertAllFlowFilesTransferred(final String relationship) {
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            session.assertAllFlowFilesTransferred(relationship);
+        }
+    }
+
+    @Override
+    public void assertAllFlowFilesTransferred(final Relationship relationship) 
{
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            session.assertAllFlowFilesTransferred(relationship);
+        }
+    }
+
+    @Override
+    public void assertAllFlowFilesTransferred(final String relationship, final 
int count) {
+        assertAllFlowFilesTransferred(relationship);
+        assertTransferCount(relationship, count);
+    }
+
+    @Override
+    public void assertAllFlowFilesTransferred(final Relationship relationship, 
final int count) {
+        assertAllFlowFilesTransferred(relationship);
+        assertTransferCount(relationship, count);
+    }
+
+    @Override
+    public void assertTransferCount(final Relationship relationship, final int 
count) {
+        Assert.assertEquals(count, 
getFlowFilesForRelationship(relationship).size());
+    }
+
+    @Override
+    public void assertTransferCount(final String relationship, final int 
count) {
+        Assert.assertEquals(count, 
getFlowFilesForRelationship(relationship).size());
+    }
+
+    @Override
+    public void assertValid() {
+        context.assertValid();
+    }
+
+    @Override
+    public void assertNotValid() {
+        Assert.assertFalse("Processor appears to be valid but expected it to 
be invalid", context.isValid());
+    }
+
+    @Override
+    public boolean isQueueEmpty() {
+        return flowFileQueue.isEmpty();
+    }
+
+    @Override
+    public void assertQueueEmpty() {
+        Assert.assertTrue(flowFileQueue.isEmpty());
+    }
+
+    @Override
+    public void assertQueueNotEmpty() {
+        Assert.assertFalse(flowFileQueue.isEmpty());
+    }
+
+    @Override
+    public void clearTransferState() {
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            session.clearTransferState();
+        }
+    }
+
+    @Override
+    public void enqueue(final FlowFile... flowFiles) {
+        for (final FlowFile flowFile : flowFiles) {
+            flowFileQueue.offer((MockFlowFile) flowFile);
+        }
+    }
+
+    @Override
+    public void enqueue(final Path path) throws IOException {
+        enqueue(path, new HashMap<String, String>());
+    }
+
+    @Override
+    public void enqueue(final Path path, final Map<String, String> attributes) 
throws IOException {
+        final Map<String, String> modifiedAttributes = new 
HashMap<>(attributes);
+        if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
+            modifiedAttributes.put(CoreAttributes.FILENAME.key(), 
path.toFile().getName());
+        }
+        try (final InputStream in = Files.newInputStream(path)) {
+            enqueue(in, modifiedAttributes);
+        }
+    }
+
+    @Override
+    public void enqueue(final byte[] data) {
+        enqueue(data, new HashMap<String, String>());
+    }
+
+    @Override
+    public void enqueue(final byte[] data, final Map<String, String> 
attributes) {
+        enqueue(new ByteArrayInputStream(data), attributes);
+    }
+
+    @Override
+    public void enqueue(final InputStream data) {
+        enqueue(data, new HashMap<String, String>());
+    }
+
+    @Override
+    public void enqueue(final InputStream data, final Map<String, String> 
attributes) {
+        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), processor);
+        MockFlowFile flowFile = session.create();
+        flowFile = session.importFrom(data, flowFile);
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        enqueue(flowFile);
+    }
+
+    @Override
+    public byte[] getContentAsByteArray(final MockFlowFile flowFile) {
+        return flowFile.getData();
+    }
+
+    @Override
+    public List<MockFlowFile> getFlowFilesForRelationship(final String 
relationship) {
+        final Relationship rel = new 
Relationship.Builder().name(relationship).build();
+        return getFlowFilesForRelationship(rel);
+    }
+
+    @Override
+    public List<MockFlowFile> getFlowFilesForRelationship(final Relationship 
relationship) {
+        final List<MockFlowFile> flowFiles = new ArrayList<>();
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            
flowFiles.addAll(session.getFlowFilesForRelationship(relationship));
+        }
+
+        Collections.sort(flowFiles, new Comparator<MockFlowFile>() {
+            @Override
+            public int compare(final MockFlowFile o1, final MockFlowFile o2) {
+                return Long.compare(o1.getCreationTime(), 
o2.getCreationTime());
+            }
+        });
+
+        return flowFiles;
+    }
+
+    /**
+     * @deprecated The ProvenanceReporter should not be accessed through the 
test runner, as it does not expose the events that were emitted.
+     */
+    @Override
+    @Deprecated
+    public ProvenanceReporter getProvenanceReporter() {
+        return sharedState.getProvenanceReporter();
+    }
+
+    @Override
+    public QueueSize getQueueSize() {
+        return flowFileQueue.size();
+    }
+
+    @Override
+    public Long getCounterValue(final String name) {
+        return sharedState.getCounterValue(name);
+    }
+
+    @Override
+    public int getRemovedCount() {
+        int count = 0;
+        for (final MockProcessSession session : 
sessionFactory.getCreatedSessions()) {
+            count += session.getRemovedCount();
+        }
+
+        return count;
+    }
+
+    @Override
+    public void setAnnotationData(final String annotationData) {
+        context.setAnnotationData(annotationData);
+    }
+
+    @Override
+    public ValidationResult setProperty(final String propertyName, final 
String propertyValue) {
+        return context.setProperty(propertyName, propertyValue);
+    }
+
+    @Override
+    public ValidationResult setProperty(final PropertyDescriptor descriptor, 
final String value) {
+        return context.setProperty(descriptor, value);
+    }
+
+    @Override
+    public ValidationResult setProperty(final PropertyDescriptor descriptor, 
final AllowableValue value) {
+        return context.setProperty(descriptor, value.getValue());
+    }
+
+    @Override
+    public void setThreadCount(final int threadCount) {
+        if (threadCount > 1 && triggerSerially) {
+            Assert.fail("Cannot set thread-count higher than 1 because the 
processor is triggered serially");
+        }
+
+        this.numThreads = threadCount;
+    }
+
+    @Override
+    public int getThreadCount() {
+        return numThreads;
+    }
+
+    @Override
+    public void setRelationshipAvailable(final Relationship relationship) {
+        final Set<Relationship> unavailable = new 
HashSet<>(context.getUnavailableRelationships());
+        unavailable.remove(relationship);
+        context.setUnavailableRelationships(unavailable);
+    }
+
+    @Override
+    public void setRelationshipAvailable(final String relationshipName) {
+        setRelationshipAvailable(new 
Relationship.Builder().name(relationshipName).build());
+    }
+
+    @Override
+    public void setRelationshipUnavailable(final Relationship relationship) {
+        final Set<Relationship> unavailable = new 
HashSet<>(context.getUnavailableRelationships());
+        unavailable.add(relationship);
+        context.setUnavailableRelationships(unavailable);
+    }
+
+    @Override
+    public void setRelationshipUnavailable(final String relationshipName) {
+        setRelationshipUnavailable(new 
Relationship.Builder().name(relationshipName).build());
+    }
+
+    @Override
+    public void addControllerService(final String identifier, final 
ControllerService service) throws InitializationException {
+        addControllerService(identifier, service, new HashMap<String, 
String>());
+    }
+
+    @Override
+    public void addControllerService(final String identifier, final 
ControllerService service, final Map<String, String> properties) throws 
InitializationException {
+        // hold off on failing due to deprecated annotation for now... will 
introduce later.
+//        for ( final Method method : service.getClass().getMethods() ) {
+//            if ( 
method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class)
 ) {
+//                Assert.fail("Controller Service " + service + " is using 
deprecated Annotation " + 
org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + 
method);
+//            }
+//        }
+
+        final ComponentLog logger = new MockProcessorLog(identifier, service);
+        final MockControllerServiceInitializationContext initContext = new 
MockControllerServiceInitializationContext(requireNonNull(service), 
requireNonNull(identifier), logger);
+        service.initialize(initContext);
+
+        final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>();
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), 
entry.getValue());
+        }
+
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
service);
+        } catch (final InvocationTargetException | IllegalAccessException | 
IllegalArgumentException e) {
+            throw new InitializationException(e);
+        }
+
+        context.addControllerService(identifier, service, resolvedProps, null);
+    }
+
+    @Override
+    public void assertNotValid(final ControllerService service) {
+        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
+
+        for (final ValidationResult result : results) {
+            if (!result.isValid()) {
+                return;
+            }
+        }
+
+        Assert.fail("Expected Controller Service " + service + " to be invalid 
but it is valid");
+    }
+
+    @Override
+    public void assertValid(final ControllerService service) {
+        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final Collection<ValidationResult> results = 
context.getControllerService(service.getIdentifier()).validate(validationContext);
+
+        for (final ValidationResult result : results) {
+            if (!result.isValid()) {
+                Assert.fail("Expected Controller Service to be valid but it is 
invalid due to: " + result.toString());
+            }
+        }
+    }
+
+    @Override
+    public void disableControllerService(final ControllerService service) {
+        final ControllerServiceConfiguration configuration = 
context.getConfiguration(service.getIdentifier());
+        if (configuration == null) {
+            throw new IllegalArgumentException("Controller Service " + service 
+ " is not known");
+        }
+
+        if (!configuration.isEnabled()) {
+            throw new IllegalStateException("Controller service " + service + 
" cannot be disabled because it is not enabled");
+        }
+
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, 
service);
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail("Failed to disable Controller Service " + service + " 
due to " + e);
+        }
+
+        configuration.setEnabled(false);
+    }
+
+    @Override
+    public void enableControllerService(final ControllerService service) {
+        final ControllerServiceConfiguration configuration = 
context.getConfiguration(service.getIdentifier());
+        if (configuration == null) {
+            throw new IllegalArgumentException("Controller Service " + service 
+ " is not known");
+        }
+
+        if (configuration.isEnabled()) {
+            throw new IllegalStateException("Cannot enable Controller Service 
" + service + " because it is not disabled");
+        }
+
+        try {
+            final ConfigurationContext configContext = new 
MockConfigurationContext(service, configuration.getProperties(), context);
+            ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, 
service, configContext);
+        } catch (final InvocationTargetException ite) {
+            ite.getCause().printStackTrace();
+            Assert.fail("Failed to enable Controller Service " + service + " 
due to " + ite.getCause());
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail("Failed to enable Controller Service " + service + " 
due to " + e);
+        }
+
+        configuration.setEnabled(true);
+    }
+
+    @Override
+    public boolean isControllerServiceEnabled(final ControllerService service) 
{
+        final ControllerServiceConfiguration configuration = 
context.getConfiguration(service.getIdentifier());
+        if (configuration == null) {
+            throw new IllegalArgumentException("Controller Service " + service 
+ " is not known");
+        }
+
+        return configuration.isEnabled();
+    }
+
+    @Override
+    public void removeControllerService(final ControllerService service) {
+        disableControllerService(service);
+
+        try {
+            ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, 
service);
+        } catch (final Exception e) {
+            e.printStackTrace();
+            Assert.fail("Failed to remove Controller Service " + service + " 
due to " + e);
+        }
+
+        context.removeControllerService(service);
+    }
+
+    @Override
+    public void setAnnotationData(final ControllerService service, final 
String annotationData) {
+        final ControllerServiceConfiguration configuration = 
getConfigToUpdate(service);
+        configuration.setAnnotationData(annotationData);
+    }
+
+    private ControllerServiceConfiguration getConfigToUpdate(final 
ControllerService service) {
+        final ControllerServiceConfiguration configuration = 
context.getConfiguration(service.getIdentifier());
+        if (configuration == null) {
+            throw new IllegalArgumentException("Controller Service " + service 
+ " is not known");
+        }
+
+        if (configuration.isEnabled()) {
+            throw new IllegalStateException("Controller service " + service + 
" cannot be modified because it is not disabled");
+        }
+
+        return configuration;
+    }
+
+    @Override
+    public ValidationResult setProperty(final ControllerService service, final 
PropertyDescriptor property, final AllowableValue value) {
+        return setProperty(service, property, value.getValue());
+    }
+
+    @Override
+    public ValidationResult setProperty(final ControllerService service, final 
PropertyDescriptor property, final String value) {
+        final ControllerServiceConfiguration configuration = 
getConfigToUpdate(service);
+        final Map<PropertyDescriptor, String> curProps = 
configuration.getProperties();
+        final Map<PropertyDescriptor, String> updatedProps = new 
HashMap<>(curProps);
+
+        final ValidationContext validationContext = new 
MockValidationContext(context).getControllerServiceValidationContext(service);
+        final ValidationResult validationResult = property.validate(value, 
validationContext);
+
+        updatedProps.put(property, value);
+        configuration.setProperties(updatedProps);
+
+        return validationResult;
+    }
+
+    @Override
+    public ValidationResult setProperty(final ControllerService service, final 
String propertyName, final String value) {
+        final PropertyDescriptor descriptor = 
service.getPropertyDescriptor(propertyName);
+        if (descriptor == null) {
+            return new ValidationResult.Builder()
+                    .input(propertyName)
+                    .explanation(propertyName + " is not a known Property for 
Controller Service " + service)
+                    .subject("Invalid property")
+                    .valid(false)
+                    .build();
+        }
+        return setProperty(service, descriptor, value);
+    }
+
+    @Override
+    public ControllerService getControllerService(final String identifier) {
+        return context.getControllerService(identifier);
+    }
+
+    @Override
+    public <T extends ControllerService> T getControllerService(final String 
identifier, final Class<T> serviceType) {
+        final ControllerService service = 
context.getControllerService(identifier);
+        return serviceType.cast(service);
+    }
+
+    @Override
+    public boolean removeProperty(PropertyDescriptor descriptor) {
+        return context.removeProperty(descriptor);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getProvenanceEvents() {
+        return sharedState.getProvenanceEvents();
+    }
+
+    @Override
+    public void clearProvenanceEvents() {
+        sharedState.clearProvenanceEvents();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
new file mode 100644
index 0000000..fb9fc78
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -0,0 +1,765 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.reporting.InitializationException;
+
+public interface TestRunner {
+
+    /**
+     * @return the {@link Processor} for which this <code>TestRunner</code> is
+     * configured
+     */
+    Processor getProcessor();
+
+    /**
+     * @return the {@link ProcessSessionFactory} that this
+     * <code>TestRunner</code> will use to invoke the
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} 
method
+     */
+    ProcessSessionFactory getProcessSessionFactory();
+
+    /**
+     * @return the {@Link ProcessContext} that this <code>TestRunner</code> 
will
+     * use to invoke the
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger}
+     * method
+     */
+    ProcessContext getProcessContext();
+
+    /**
+     * Performs exactly the same operation as calling {@link #run(int)} with a
+     * value of 1.
+     */
+    void run();
+
+    /**
+     * Performs the same operation as calling {@link #run(int, boolean)} with a
+     * value of <code>true</code>
+     *
+     * @param iterations number of iterations
+     */
+    void run(int iterations);
+
+    /**
+     * performs the same operation as calling {@link #run(int, boolean, int)}
+     * with a value of {@code iterations}, {@code stopOnFinish}, {@code true}
+     *
+     * @param iterations number of iterations
+     * @param stopOnFinish flag to stop when finished
+     */
+    void run(int iterations, boolean stopOnFinish);
+
+    /**
+     * This method runs the {@link Processor} <code>iterations</code> times,
+     * using the sequence of steps below:
+     * <ul>
+     * <li>
+     * If {@code initialize} is true, run all methods on the Processor that are
+     * annotated with the
+     * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. 
If
+     * any of these methods throws an Exception, the Unit Test will fail.
+     * </li>
+     * <li>
+     * Schedule the
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger}
+     * method to be invoked <code>iterations</code> times. The number of 
threads
+     * used to run these iterations is determined by the ThreadCount of this
+     * <code>TestRunner</code>. By default, the value is set to 1, but it can 
be
+     * modified by calling the {@link #setThreadCount(int)} method.
+     * </li>
+     * <li>
+     * As soon as the first thread finishes its execution of
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger},
+     * all methods on the Processor that are annotated with the
+     * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} 
annotation
+     * are invoked. If any of these methods throws an Exception, the Unit Test
+     * will fail.
+     * </li>
+     * <li>
+     * Waits for all threads to finish execution.
+     * </li>
+     * <li>
+     * If and only if the value of <code>shutdown</code> is true: Call all
+     * methods on the Processor that is annotated with the
+     * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation.
+     * </li>
+     * </ul>
+     *
+     * @param iterations number of iterations
+     * @param stopOnFinish whether or not to run the Processor methods that are
+     * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped}
+     * @param initialize true if must initialize
+     */
+    void run(int iterations, boolean stopOnFinish, final boolean initialize);
+
+    /**
+     * This method runs the {@link Processor} <code>iterations</code> times,
+     * using the sequence of steps below:
+     * <ul>
+     * <li>
+     * If {@code initialize} is true, run all methods on the Processor that are
+     * annotated with the
+     * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. 
If
+     * any of these methods throws an Exception, the Unit Test will fail.
+     * </li>
+     * <li>
+     * Schedule the
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger}
+     * method to be invoked <code>iterations</code> times. The number of 
threads
+     * used to run these iterations is determined by the ThreadCount of this
+     * <code>TestRunner</code>. By default, the value is set to 1, but it can 
be
+     * modified by calling the {@link #setThreadCount(int)} method.
+     * </li>
+     * <li>
+     * As soon as the first thread finishes its execution of
+     * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) 
onTrigger},
+     * all methods on the Processor that are annotated with the
+     * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} 
annotation
+     * are invoked. If any of these methods throws an Exception, the Unit Test
+     * will fail.
+     * </li>
+     * <li>
+     * Waits for all threads to finish execution.
+     * </li>
+     * <li>
+     * If and only if the value of <code>shutdown</code> is true: Call all
+     * methods on the Processor that is annotated with the
+     * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation.
+     * </li>
+     * </ul>
+     *
+     * @param iterations number of iterations
+     * @param stopOnFinish whether or not to run the Processor methods that are
+     * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped}
+     * @param initialize true if must initialize
+     * @param runWait indicates the amount of time in milliseconds that the 
framework should wait for
+     * processors to stop running before calling the {@link 
nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation
+     */
+    void run(int iterations, boolean stopOnFinish, final boolean initialize, 
final long runWait);
+
+    /**
+     * Invokes all methods on the Processor that are annotated with the
+     * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If
+     * any of these methods throws an Exception, the Unit Test will fail
+     */
+    void shutdown();
+
+    /**
+     * Updates the number of threads that will be used to run the Processor 
when
+     * calling the {@link #run()} or {@link #run(int)} methods.
+     *
+     * @param threadCount num threads
+     */
+    void setThreadCount(int threadCount);
+
+    /**
+     * @return the currently configured number of threads that will be used to
+     * runt he Processor when calling the {@link #run()} or {@link #run(int)}
+     * methods
+     */
+    int getThreadCount();
+
+    /**
+     * Updates the value of the property with the given PropertyDescriptor to
+     * the specified value IF and ONLY IF the value is valid according to the
+     * descriptor's validator. Otherwise, Assert.fail() is called, causing the
+     * unit test to fail
+     *
+     * @param propertyName name
+     * @param propertyValue value
+     * @return result
+     */
+    ValidationResult setProperty(String propertyName, String propertyValue);
+
+    /**
+     * Updates the value of the property with the given PropertyDescriptor to
+     * the specified value IF and ONLY IF the value is valid according to the
+     * descriptor's validator. Otherwise, Assert.fail() is called, causing the
+     * unit test to fail
+     *
+     * @param descriptor descriptor
+     * @param value value
+     * @return result
+     */
+    ValidationResult setProperty(PropertyDescriptor descriptor, String value);
+
+    /**
+     * Updates the value of the property with the given PropertyDescriptor to
+     * the specified value IF and ONLY IF the value is valid according to the
+     * descriptor's validator. Otherwise, Assert.fail() is called, causing the
+     * unit test to fail
+     *
+     * @param descriptor descriptor
+     * @param value allowable valu
+     * @return result
+     */
+    ValidationResult setProperty(PropertyDescriptor descriptor, AllowableValue 
value);
+
+    /**
+     * Sets the annotation data.
+     *
+     * @param annotationData data
+     */
+    void setAnnotationData(String annotationData);
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship
+     *
+     * @param relationship to verify
+     */
+    void assertAllFlowFilesTransferred(String relationship);
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship
+     *
+     * @param relationship to verify
+     */
+    void assertAllFlowFilesTransferred(Relationship relationship);
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship and that the number of FlowFiles transferred is equal
+     * to <code>count</code>
+     *
+     * @param relationship to verify
+     * @param count number of expected transfers
+     */
+    void assertAllFlowFilesTransferred(String relationship, int count);
+
+    /**
+     * Asserts that all FlowFiles that were transferred were transferred to the
+     * given relationship and that the number of FlowFiles transferred is equal
+     * to <code>count</code>
+     *
+     * @param relationship to verify
+     * @param count number of expected transfers
+     */
+    void assertAllFlowFilesTransferred(Relationship relationship, int count);
+
+    /**
+     * Assert that the number of FlowFiles transferred to the given 
relationship
+     * is equal to the given count
+     *
+     * @param relationship to verify
+     * @param count number of expected transfers
+     */
+    void assertTransferCount(Relationship relationship, int count);
+
+    /**
+     * Assert that the number of FlowFiles transferred to the given 
relationship
+     * is equal to the given count
+     *
+     * @param relationship to verify
+     * @param count number of expected transfers
+     */
+    void assertTransferCount(String relationship, int count);
+
+    /**
+     * Assert that there are no FlowFiles left on the input queue.
+     */
+    void assertQueueEmpty();
+
+    /**
+     * @return <code>true</code> if the Input Queue to the Processor is empty,
+     * <code>false</code> otherwise
+     */
+    boolean isQueueEmpty();
+
+    /**
+     * Assert that there is at least one FlowFile left on the input queue.
+     */
+    void assertQueueNotEmpty();
+
+    /**
+     * Assert that the currently configured set of properties/annotation data
+     * are valid
+     */
+    void assertValid();
+
+    /**
+     * Assert that the currently configured set of properties/annotation data
+     * are NOT valid
+     */
+    void assertNotValid();
+
+    /**
+     * Resets the Transfer Counts that indicate how many FlowFiles have been
+     * transferred to each Relationship and removes from memory any FlowFiles
+     * that have been transferred to this Relationships. This method should be
+     * called between successive calls to {@link #run(int) run} if the output 
is
+     * to be examined after each run.
+     */
+    void clearTransferState();
+
+    /**
+     * Enqueues the given FlowFiles into the Processor's input queue
+     *
+     * @param flowFiles to enqueue
+     */
+    void enqueue(FlowFile... flowFiles);
+
+    /**
+     * Reads the content from the given {@link Path} into memory and creates a
+     * FlowFile from this content with no attributes and adds this FlowFile to
+     * the Processor's Input Queue
+     *
+     * @param path to read content from
+     * @throws IOException if unable to read content
+     */
+    void enqueue(Path path) throws IOException;
+
+    /**
+     * Reads the content from the given {@link Path} into memory and creates a
+     * FlowFile from this content with the given attributes and adds this
+     * FlowFile to the Processor's Input Queue
+     *
+     * @param path to read content from
+     * @param attributes attributes to use for new flow file
+     * @throws IOException if unable to read content
+     */
+    void enqueue(Path path, Map<String, String> attributes) throws IOException;
+
+    /**
+     * Copies the content from the given byte array into memory and creates a
+     * FlowFile from this content with no attributes and adds this FlowFile to
+     * the Processor's Input Queue
+     *
+     * @param data to enqueue
+     */
+    void enqueue(byte[] data);
+
+    /**
+     * Copies the content from the given byte array into memory and creates a
+     * FlowFile from this content with the given attributes and adds this
+     * FlowFile to the Processor's Input Queue
+     *
+     * @param data to enqueue
+     * @param attributes to use for enqueued items
+     */
+    void enqueue(byte[] data, Map<String, String> attributes);
+
+    /**
+     * Reads the content from the given {@link InputStream} into memory and
+     * creates a FlowFile from this content with no attributes and adds this
+     * FlowFile to the Processor's Input Queue
+     *
+     * @param data to source data from
+     */
+    void enqueue(InputStream data);
+
+    /**
+     * Reads the content from the given {@link InputStream} into memory and
+     * creates a FlowFile from this content with the given attributes and adds
+     * this FlowFile to the Processor's Input Queue
+     *
+     * @param data source of data
+     * @param attributes to use for flow files
+     */
+    void enqueue(InputStream data, Map<String, String> attributes);
+
+    /**
+     * Copies the contents of the given {@link MockFlowFile} into a byte array
+     * and returns that byte array.
+     *
+     * @param flowFile to get content for
+     * @return byte array of flowfile content
+     */
+    byte[] getContentAsByteArray(MockFlowFile flowFile);
+
+    /**
+     * Returns a List of FlowFiles in the order in which they were transferred
+     * to the given relationship
+     *
+     * @param relationship to get flowfiles for
+     * @return flowfiles transfered to given relationship
+     */
+    List<MockFlowFile> getFlowFilesForRelationship(String relationship);
+
+    /**
+     * Returns a List of FlowFiles in the order in which they were transferred
+     * to the given relationship
+     *
+     * @param relationship to get flowfiles for
+     * @return flowfiles transfered to given relationship
+     */
+    List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship);
+
+    /**
+     * @return the {@link ProvenanceReporter} that will be used by the
+     * configured {@link Processor} for reporting Provenance Events
+     */
+    ProvenanceReporter getProvenanceReporter();
+
+    /**
+     * @return the current size of the Processor's Input Queue
+     */
+    QueueSize getQueueSize();
+
+    /**
+     * @param name of counter
+     * @return the current value of the counter with the specified name, or 
null
+     * if no counter exists with the specified name
+     */
+    Long getCounterValue(String name);
+
+    /**
+     * @return the number of FlowFiles that have been removed from the system
+     */
+    int getRemovedCount();
+
+    /**
+     * Indicates to the Framework that the given Relationship should be
+     * considered "available", meaning that the queues of all Connections that
+     * contain this Relationship are not full. This is generally used only when
+     * dealing with Processors that use the
+     * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable}
+     * annotation.
+     *
+     * @param relationship to mark as available
+     */
+    void setRelationshipAvailable(Relationship relationship);
+
+    /**
+     * Indicates to the Framework that the given Relationship with the given
+     * name should be considered "available", meaning that the queues of all
+     * Connections that contain this Relationship are not full. This is
+     * generally used only when dealing with Processors that use the
+     * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable}
+     *
+     * @param relationshipName relationship name
+     */
+    void setRelationshipAvailable(String relationshipName);
+
+    /**
+     * Indicates to the Framework that the given Relationship should NOT be
+     * considered "available", meaning that the queue of at least one 
Connection
+     * that contain this Relationship is full. This is generally used only when
+     * dealing with Processors that use the
+     * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable}
+     * annotation.
+     *
+     * @param relationship to mark as unavailable
+     */
+    void setRelationshipUnavailable(Relationship relationship);
+
+    /**
+     * Indicates to the Framework that the Relationship with the given name
+     * should NOT be considered "available", meaning that the queue of at least
+     * one Connection that contain this Relationship is full. This is generally
+     * used only when dealing with Processors that use the
+     * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable}
+     *
+     * @param relationshipName name of relationship.
+     */
+    void setRelationshipUnavailable(String relationshipName);
+
+    /**
+     * Adds the given {@link ControllerService} to this TestRunner so that the
+     * configured Processor can access it using the given
+     * <code>identifier</code>. The ControllerService is not expected to be
+     * initialized, as the framework will create the appropriate
+     * {@link nifi.controller.ControllerServiceInitializationContext 
ControllerServiceInitializationContext}
+     * and initialize the ControllerService with no specified properties.
+     *
+     * This will call any method on the given Controller Service that is
+     * annotated with the
+     * {@link org.apache.nifi.annotation.lifecycle.OnAdded @OnAdded} 
annotation.
+     *
+     * @param identifier of service
+     * @param service the service
+     * @throws InitializationException ie
+     */
+    void addControllerService(String identifier, ControllerService service) 
throws InitializationException;
+
+    /**
+     * Adds the given {@link ControllerService} to this TestRunner so that the
+     * configured Processor can access it using the given
+     * <code>identifier</code>. The ControllerService is not expected to be
+     * initialized, as the framework will create the appropriate
+     * {@link nifi.controller.ControllerServiceInitializationContext 
ControllerServiceInitializationContext}
+     * and initialize the ControllerService with the given properties.
+     *
+     * This will call any method on the given Controller Service that is
+     * annotated with the
+     * {@link org.apache.nifi.annotation.lifecycle.OnAdded @OnAdded} 
annotation.
+     *
+     * @param identifier of service
+     * @param service the service
+     * @param properties service properties
+     * @throws InitializationException ie
+     */
+    void addControllerService(String identifier, ControllerService service, 
Map<String, String> properties) throws InitializationException;
+
+    /**
+     * <p>
+     * Marks the Controller Service as enabled so that it can be used by other
+     * components.
+     * </p>
+     *
+     * <p>
+     * This method will result in calling any method in the Controller Service
+     * that is annotated with the
+     * {@link org.apache.nifi.annotation.lifecycle.OnEnabled @OnEnabled}
+     * annotation.
+     * </p>
+     *
+     * @param service the service to enable
+     */
+    void enableControllerService(ControllerService service);
+
+    /**
+     * <p>
+     * Marks the Controller Service as disabled so that it cannot be used by
+     * other components.
+     * </p>
+     *
+     * <p>
+     * This method will result in calling any method in the Controller Service
+     * that is annotated with the
+     * {@link org.apache.nifi.annotation.lifecycle.OnDisabled @OnDisabled}
+     * annotation.
+     * </p>
+     *
+     * @param service the service to disable
+     */
+    void disableControllerService(ControllerService service);
+
+    /**
+     * @param service the service
+     * @return {@code true} if the given Controller Service is enabled,
+     * {@code false} if it is disabled
+     *
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     */
+    boolean isControllerServiceEnabled(ControllerService service);
+
+    /**
+     * <p>
+     * Removes the Controller Service from the TestRunner. This will call any
+     * method on the ControllerService that is annotated with the
+     * {@link org.apache.nifi.annotation.lifecycle.OnRemoved @OnRemoved}
+     * annotation.
+     * </p>
+     *
+     * @param service the service
+     *
+     * @throws IllegalStateException if the ControllerService is not disabled
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     *
+     */
+    void removeControllerService(ControllerService service);
+
+    /**
+     * Sets the given property on the given ControllerService
+     *
+     * @param service to modify
+     * @param property to modify
+     * @param value value to use
+     * @return result
+     *
+     * @throws IllegalStateException if the ControllerService is not disabled
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     *
+     */
+    ValidationResult setProperty(ControllerService service, PropertyDescriptor 
property, String value);
+
+    /**
+     * Sets the given property on the given ControllerService
+     *
+     * @param service to modify
+     * @param property to modify
+     * @param value value to use
+     * @return result
+     *
+     * @throws IllegalStateException if the ControllerService is not disabled
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     *
+     */
+    ValidationResult setProperty(ControllerService service, PropertyDescriptor 
property, AllowableValue value);
+
+    /**
+     * Sets the property with the given name on the given ControllerService
+     *
+     * @param service to modify
+     * @param propertyName to modify
+     * @param value value to use
+     * @return result
+     *
+     * @throws IllegalStateException if the ControllerService is not disabled
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     *
+     */
+    ValidationResult setProperty(ControllerService service, String 
propertyName, String value);
+
+    /**
+     * Sets the annontation data of the given service to the provided 
annotation
+     * data.
+     *
+     * @param service to modify
+     * @param annotationData the data
+     *
+     * @throws IllegalStateException if the Controller Service is not disabled
+     *
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     */
+    void setAnnotationData(ControllerService service, String annotationData);
+
+    /**
+     * @param identifier of controller service
+     * @return the {@link ControllerService} that is registered with the given
+     * identifier, or <code>null</code> if no Controller Service exists with 
the
+     * given identifier
+     */
+    ControllerService getControllerService(String identifier);
+
+    /**
+     * Assert that the currently configured set of properties/annotation data
+     * are valid for the given Controller Service.
+     *
+     * @param service the service to validate
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     */
+    void assertValid(ControllerService service);
+
+    /**
+     * Assert that the currently configured set of properties/annotation data
+     * are NOT valid for the given Controller Service.
+     *
+     * @param service the service to validate
+     * @throws IllegalArgumentException if the given ControllerService is not
+     * known by this TestRunner (i.e., it has not been added via the
+     * {@link #addControllerService(String, ControllerService)} or
+     * {@link #addControllerService(String, ControllerService, Map)} method or
+     * if the Controller Service has been removed via the
+     * {@link #removeControllerService(ControllerService)} method.
+     *
+     */
+    void assertNotValid(ControllerService service);
+
+    /**
+     * @param <T> type of service
+     * @param identifier identifier of service
+     * @param serviceType type of service
+     * @return the {@link ControllerService} that is registered with the given
+     * identifier, cast as the provided service type, or <code>null</code> if 
no
+     * Controller Service exists with the given identifier
+     *
+     * @throws ClassCastException if the identifier given is registered for a
+     * Controller Service but that Controller Service is not of the type
+     * specified
+     */
+    <T extends ControllerService> T getControllerService(String identifier, 
Class<T> serviceType);
+
+    /**
+     * Specifies whether or not the TestRunner will validate the use of
+     * Expression Language. By default, the value is <code>true</code>, which
+     * means that an Exception will be thrown if the Processor attempts to
+     * obtain the configured value of a Property without calling
+     * {@link nifi.components.PropertyValue#evaluateAttributeExpressions 
evaluateAttributeExpressions}
+     * on the Property Value or if
+     * {@link nifi.components.PropertyValue#evaluateAttributeExpressions 
evaluateAttributeExpressions}
+     * is called but the PropertyDescriptor indicates that the Expression
+     * Language is not supported.
+     *
+     * <p>
+     * <b>See Also:
+     * </b>{@link 
PropertyDescriptor.Builder#expressionLanguageSupported(boolean)}
+     * </p>
+     *
+     * @param validate whether there is any need to validate the EL was used
+     */
+    void setValidateExpressionUsage(boolean validate);
+
+    /**
+     * Removes the {@link PropertyDescriptor} from the {@link ProcessContext},
+     * effectively setting its value to null.
+     *
+     * @param descriptor of property to remove
+     * @return true if removed
+     */
+    boolean removeProperty(PropertyDescriptor descriptor);
+
+    /**
+     * Returns a {@link List} of all {@link ProvenanceEventRecord}s that were
+     * emitted by the Processor
+     *
+     * @return a List of all Provenance Events that were emitted by the
+     *         Processor
+     */
+    List<ProvenanceEventRecord> getProvenanceEvents();
+
+    /**
+     * Clears the Provenance Events that have been emitted by the Processor
+     */
+    void clearProvenanceEvents();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
new file mode 100644
index 0000000..f2b0b23
--- /dev/null
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.Processor;
+
+public class TestRunners {
+
+    public static TestRunner newTestRunner(final Processor processor) {
+        return new StandardProcessorTestRunner(processor);
+    }
+
+    public static TestRunner newTestRunner(final Class<? extends Processor> 
processorClass) {
+        try {
+            return newTestRunner(processorClass.newInstance());
+
+        } catch (final Exception e) {
+            System.err.println("Could not instantiate instance of class " + 
processorClass.getName() + " due to: " + e);
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
 
b/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
new file mode 100644
index 0000000..6b403af
--- /dev/null
+++ 
b/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CurrentTestStandardProcessorTestRunner {
+
+    /**
+     * This test will verify that all iterations of the run are finished 
before unscheduled is called
+     */
+    @Test
+    public void testOnScheduledCalledAfterRunFinished() {
+        SlowRunProcessor processor = new SlowRunProcessor();
+        StandardProcessorTestRunner runner = new 
StandardProcessorTestRunner(processor);
+        final int iterations = 5;
+        runner.run(iterations);
+        // if the counter is not equal to iterations, the the processor must 
have been unscheduled
+        // before all the run calls were made, that would be bad.
+        Assert.assertEquals(iterations, processor.getCounter());
+    }
+
+    /**
+     * This processor simulates a "slow" processor that checks whether it is 
scheduled before doing something
+     *
+     *
+     */
+    private static class SlowRunProcessor extends AbstractProcessor {
+
+        private int counter = 0;
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+            try {
+                // be slow
+                Thread.sleep(50);
+                // make sure we are still scheduled
+                if (isScheduled()) {
+                    // increment counter
+                    ++counter;
+                }
+            } catch (InterruptedException e) {
+            }
+
+        }
+
+        public int getCounter() {
+            return counter;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
new file mode 100644
index 0000000..323a357
--- /dev/null
+++ 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("This should not be enabled until we actually fail processor unit 
tests for using deprecated methods, which should happen in 0.1.0")
+public class TestStandardProcessorTestRunner {
+
+    @Test(expected = AssertionError.class)
+    public void testFailOnDeprecatedTypeAnnotation() {
+        new StandardProcessorTestRunner(new DeprecatedAnnotation());
+    }
+
+    @Test
+    public void testDoesNotFailOnNonDeprecatedTypeAnnotation() {
+        new StandardProcessorTestRunner(new NewAnnotation());
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testFailOnDeprecatedMethodAnnotation() {
+        new StandardProcessorTestRunner(new DeprecatedMethodAnnotation());
+    }
+
+    @Test
+    public void testDoesNotFailOnNonDeprecatedMethodAnnotation() {
+        new StandardProcessorTestRunner(new NewMethodAnnotation());
+    }
+
+    @SuppressWarnings("deprecation")
+    @org.apache.nifi.processor.annotation.Tags({"deprecated"})
+    private static class DeprecatedAnnotation extends AbstractProcessor {
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        }
+    }
+
+    @org.apache.nifi.annotation.documentation.Tags({"deprecated"})
+    private static class NewAnnotation extends AbstractProcessor {
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        }
+    }
+
+    private static class NewMethodAnnotation extends AbstractProcessor {
+
+        @org.apache.nifi.annotation.lifecycle.OnScheduled
+        public void dummy() {
+
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        }
+    }
+
+    private static class DeprecatedMethodAnnotation extends AbstractProcessor {
+
+        @SuppressWarnings("deprecation")
+        @org.apache.nifi.processor.annotation.OnScheduled
+        public void dummy() {
+
+        }
+
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        }
+    }
+}

Reply via email to