http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java new file mode 100644 index 0000000..d73a09b --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.attribute.expression.language.Query; +import org.apache.nifi.attribute.expression.language.Query.Range; +import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.expression.ExpressionLanguageCompiler; + +public class MockValidationContext implements ValidationContext, ControllerServiceLookup { + + private final MockProcessContext context; + private final Map<String, Boolean> expressionLanguageSupported; + + public MockValidationContext(final MockProcessContext processContext) { + this.context = processContext; + + final Map<PropertyDescriptor, String> properties = processContext.getProperties(); + expressionLanguageSupported = new HashMap<>(properties.size()); + for (final PropertyDescriptor descriptor : properties.keySet()) { + expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported()); + } + } + + @Override + public ControllerService getControllerService(final String identifier) { + return context.getControllerService(identifier); + } + + @Override + public PropertyValue newPropertyValue(final String rawValue) { + return new MockPropertyValue(rawValue, this); + } + + @Override + public ExpressionLanguageCompiler newExpressionLanguageCompiler() { + return new StandardExpressionLanguageCompiler(); + } + + @Override + public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) { + final MockProcessContext serviceProcessContext = new MockProcessContext(controllerService, context); + return new MockValidationContext(serviceProcessContext); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + return context.getProperty(property); + } + + @Override + public Map<PropertyDescriptor, String> getProperties() { + return context.getProperties(); + } + + @Override + public String getAnnotationData() { + return context.getAnnotationData(); + } + + @Override + public Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) { + return context.getControllerServiceIdentifiers(serviceType); + } + + @Override + public ControllerServiceLookup getControllerServiceLookup() { + return this; + } + + @Override + public boolean isControllerServiceEnabled(final String serviceIdentifier) { + return context.isControllerServiceEnabled(serviceIdentifier); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + return context.isControllerServiceEnabled(service); + } + + @Override + public String getControllerServiceName(final String serviceIdentifier) { + final ControllerServiceConfiguration configuration = context.getConfiguration(serviceIdentifier); + return configuration == null ? null : serviceIdentifier; + } + + @Override + public boolean isValidationRequired(final ControllerService service) { + return true; + } + + @Override + public boolean isControllerServiceEnabling(String serviceIdentifier) { + return context.isControllerServiceEnabling(serviceIdentifier); + } + + public boolean isExpressionLanguagePresent(final String value) { + if (value == null) { + return false; + } + + final List<Range> elRanges = Query.extractExpressionRanges(value); + return (elRanges != null && !elRanges.isEmpty()); + } + + @Override + public boolean isExpressionLanguageSupported(final String propertyName) { + final Boolean supported = expressionLanguageSupported.get(propertyName); + return Boolean.TRUE.equals(supported); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java new file mode 100644 index 0000000..940eeea --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReflectionUtils { + + private final static Logger LOG = LoggerFactory.getLogger(ReflectionUtils.class); + + /** + * Invokes all methods on the given instance that have been annotated with + * the given Annotation. If the signature of the method that is defined in + * <code>instance</code> uses 1 or more parameters, those parameters must be + * specified by the <code>args</code> parameter. However, if more arguments + * are supplied by the <code>args</code> parameter than needed, the extra + * arguments will be ignored. + * + * @param annotation the annotation to look for + * @param instance to invoke a method of + * @param args to supply in a method call + * @throws InvocationTargetException ite + * @throws IllegalArgumentException iae + * @throws IllegalAccessException if not allowed to invoke that method + */ + public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + for (final Method method : instance.getClass().getMethods()) { + if (method.isAnnotationPresent(annotation)) { + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class<?>[] argumentTypes = method.getParameterTypes(); + if (argumentTypes.length > args.length) { + throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given", + method.getName(), instance, argumentTypes.length, args.length)); + } + + for (int i = 0; i < argumentTypes.length; i++) { + final Class<?> argType = argumentTypes[i]; + if (!argType.isAssignableFrom(args[i].getClass())) { + throw new IllegalArgumentException(String.format( + "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s", + method.getName(), instance, i, argType, args[i].getClass())); + } + } + + if (argumentTypes.length == args.length) { + method.invoke(instance, args); + } else { + final Object[] argsToPass = new Object[argumentTypes.length]; + for (int i = 0; i < argsToPass.length; i++) { + argsToPass[i] = args[i]; + } + + method.invoke(instance, argsToPass); + } + } finally { + if (!isAccessible) { + method.setAccessible(false); + } + } + } + } + } + + /** + * Invokes all methods on the given instance that have been annotated with + * the given Annotation. If the signature of the method that is defined in + * <code>instance</code> uses 1 or more parameters, those parameters must be + * specified by the <code>args</code> parameter. However, if more arguments + * are supplied by the <code>args</code> parameter than needed, the extra + * arguments will be ignored. + * + * @param annotation the annotation to look for + * @param instance to invoke a method of + * @param args to supply in a method call + * @return <code>true</code> if all appropriate methods were invoked and + * returned without throwing an Exception, <code>false</code> if one of the + * methods threw an Exception or could not be invoked; if <code>false</code> + * is returned, an error will have been logged. + */ + public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) { + for (final Method method : instance.getClass().getMethods()) { + if (method.isAnnotationPresent(annotation)) { + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class<?>[] argumentTypes = method.getParameterTypes(); + if (argumentTypes.length > args.length) { + LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given", + new Object[]{method.getName(), instance, argumentTypes.length, args.length}); + return false; + } + + for (int i = 0; i < argumentTypes.length; i++) { + final Class<?> argType = argumentTypes[i]; + if (!argType.isAssignableFrom(args[i].getClass())) { + LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}", + new Object[]{method.getName(), instance, i, argType, args[i].getClass()}); + return false; + } + } + + try { + if (argumentTypes.length == args.length) { + method.invoke(instance, args); + } else { + final Object[] argsToPass = new Object[argumentTypes.length]; + for (int i = 0; i < argsToPass.length; i++) { + argsToPass[i] = args[i]; + } + + method.invoke(instance, argsToPass); + } + } catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) { + LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t}); + LOG.error("", t); + return false; + } + } finally { + if (!isAccessible) { + method.setAccessible(false); + } + } + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java new file mode 100644 index 0000000..994735b --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/SharedSessionState.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceReporter; + +public class SharedSessionState { + + private final MockFlowFileQueue flowFileQueue; + private final ProvenanceReporter provenanceReporter; + @SuppressWarnings("unused") + private final Processor processor; + private final AtomicLong flowFileIdGenerator; + private final ConcurrentMap<String, AtomicLong> counterMap = new ConcurrentHashMap<>(); + private final Set<ProvenanceEventRecord> events = new LinkedHashSet<>(); + + public SharedSessionState(final Processor processor, final AtomicLong flowFileIdGenerator) { + flowFileQueue = new MockFlowFileQueue(); + provenanceReporter = new MockProvenanceReporter(null, this, UUID.randomUUID().toString(), "N/A"); + this.flowFileIdGenerator = flowFileIdGenerator; + this.processor = processor; + } + + void addProvenanceEvents(final Collection<ProvenanceEventRecord> events) { + this.events.addAll(events); + } + + void clearProvenanceEvents() { + this.events.clear(); + } + + public List<ProvenanceEventRecord> getProvenanceEvents() { + return new ArrayList<>(this.events); + } + + public MockFlowFileQueue getFlowFileQueue() { + return flowFileQueue; + } + + public ProvenanceReporter getProvenanceReporter() { + return provenanceReporter; + } + + public long nextFlowFileId() { + return flowFileIdGenerator.getAndIncrement(); + } + + public void adjustCounter(final String name, final long delta) { + AtomicLong counter = counterMap.get(name); + if (counter == null) { + counter = new AtomicLong(0L); + final AtomicLong existingCounter = counterMap.putIfAbsent(name, counter); + if (existingCounter != null) { + counter = existingCounter; + } + } + + counter.addAndGet(delta); + } + + public Long getCounterValue(final String name) { + final AtomicLong counterValue = counterMap.get(name); + return counterValue == null ? null : counterValue.get(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java b/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java new file mode 100644 index 0000000..ef518dc --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/SingleSessionFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; + +public class SingleSessionFactory implements ProcessSessionFactory { + + private final MockProcessSession session; + + public SingleSessionFactory(final MockProcessSession session) { + this.session = session; + } + + @Override + public ProcessSession createSession() { + return session; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java new file mode 100644 index 0000000..048e2b9 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -0,0 +1,728 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.lifecycle.OnAdded; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnRemoved; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.reporting.InitializationException; +import org.junit.Assert; + +public class StandardProcessorTestRunner implements TestRunner { + + private final Processor processor; + private final MockProcessContext context; + private final MockFlowFileQueue flowFileQueue; + private final MockSessionFactory sessionFactory; + private final SharedSessionState sharedState; + private final AtomicLong idGenerator; + private final boolean triggerSerially; + + private int numThreads = 1; + private final AtomicInteger invocations = new AtomicInteger(0); + + private static final Set<Class<? extends Annotation>> deprecatedTypeAnnotations = new HashSet<>(); + private static final Set<Class<? extends Annotation>> deprecatedMethodAnnotations = new HashSet<>(); + + static { + // do this in a separate method, just so that we can add a @SuppressWarnings annotation + // because we want to indicate explicitly that we know that we are using deprecated + // classes here. + populateDeprecatedMethods(); + } + + StandardProcessorTestRunner(final Processor processor) { + this.processor = processor; + this.idGenerator = new AtomicLong(0L); + this.sharedState = new SharedSessionState(processor, idGenerator); + this.flowFileQueue = sharedState.getFlowFileQueue(); + this.sessionFactory = new MockSessionFactory(sharedState, processor); + this.context = new MockProcessContext(processor); + + detectDeprecatedAnnotations(processor); + + final MockProcessorInitializationContext mockInitContext = new MockProcessorInitializationContext(processor, context); + processor.initialize(mockInitContext); + + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + } catch (final Exception e) { + Assert.fail("Could not invoke methods annotated with @OnAdded annotation due to: " + e); + } + + triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class); + } + + @SuppressWarnings("deprecation") + private static void populateDeprecatedMethods() { + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.CapabilityDescription.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.EventDriven.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SideEffectFree.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.SupportsBatching.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.Tags.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); + deprecatedTypeAnnotations.add(org.apache.nifi.processor.annotation.TriggerSerially.class); + + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnRemoved.class); + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnAdded.class); + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnScheduled.class); + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnShutdown.class); + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnStopped.class); + deprecatedMethodAnnotations.add(org.apache.nifi.processor.annotation.OnUnscheduled.class); + } + + private static void detectDeprecatedAnnotations(final Processor processor) { + for (final Class<? extends Annotation> annotationClass : deprecatedTypeAnnotations) { + if (processor.getClass().isAnnotationPresent(annotationClass)) { + Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName()); + } + } + + for (final Class<? extends Annotation> annotationClass : deprecatedMethodAnnotations) { + for (final Method method : processor.getClass().getMethods()) { + if (method.isAnnotationPresent(annotationClass)) { + Assert.fail("Processor is using deprecated Annotation " + annotationClass.getCanonicalName() + " for method " + method); + } + } + } + + } + + @Override + public void setValidateExpressionUsage(final boolean validate) { + context.setValidateExpressionUsage(validate); + } + + @Override + public Processor getProcessor() { + return processor; + } + + @Override + public MockProcessContext getProcessContext() { + return context; + } + + @Override + public void run() { + run(1); + } + + @Override + public void run(int iterations) { + run(iterations, true); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish) { + run(iterations, stopOnFinish, true); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize) { + run(iterations, stopOnFinish, initialize, 5000); + } + + @Override + public void run(final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait) { + if (iterations < 1) { + throw new IllegalArgumentException(); + } + + context.assertValid(); + context.enableExpressionValidation(); + try { + if (initialize) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, context); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail("Could not invoke methods annotated with @OnScheduled annotation due to: " + e); + } + } + + final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + @SuppressWarnings("unchecked") + final Future<Throwable>[] futures = new Future[iterations]; + for (int i = 0; i < iterations; i++) { + final Future<Throwable> future = executorService.submit(new RunProcessor()); + futures[i] = future; + } + + executorService.shutdown(); + try { + executorService.awaitTermination(runWait, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e1) { + } + + int finishedCount = 0; + boolean unscheduledRun = false; + for (final Future<Throwable> future : futures) { + try { + final Throwable thrown = future.get(); // wait for the result + if (thrown != null) { + throw new AssertionError(thrown); + } + + if (++finishedCount == 1) { + unscheduledRun = true; + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); + } catch (final Exception e) { + Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); + } + } + } catch (final Exception e) { + } + } + + if (!unscheduledRun) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, processor, context); + } catch (final Exception e) { + Assert.fail("Could not invoke methods annotated with @OnUnscheduled annotation due to: " + e); + } + } + + if (stopOnFinish) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnStopped.class, processor); + } catch (final Exception e) { + Assert.fail("Could not invoke methods annotated with @OnStopped annotation due to: " + e); + } + } + } finally { + context.disableExpressionValidation(); + } + } + + @Override + public void shutdown() { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnShutdown.class, processor); + } catch (final Exception e) { + Assert.fail("Could not invoke methods annotated with @OnShutdown annotation due to: " + e); + } + } + + private class RunProcessor implements Callable<Throwable> { + + @Override + public Throwable call() throws Exception { + invocations.incrementAndGet(); + try { + processor.onTrigger(context, sessionFactory); + } catch (final Throwable t) { + return t; + } + + return null; + } + } + + @Override + public ProcessSessionFactory getProcessSessionFactory() { + return sessionFactory; + } + + @Override + public void assertAllFlowFilesTransferred(final String relationship) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFilesTransferred(relationship); + } + } + + @Override + public void assertAllFlowFilesTransferred(final Relationship relationship) { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.assertAllFlowFilesTransferred(relationship); + } + } + + @Override + public void assertAllFlowFilesTransferred(final String relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + @Override + public void assertAllFlowFilesTransferred(final Relationship relationship, final int count) { + assertAllFlowFilesTransferred(relationship); + assertTransferCount(relationship, count); + } + + @Override + public void assertTransferCount(final Relationship relationship, final int count) { + Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size()); + } + + @Override + public void assertTransferCount(final String relationship, final int count) { + Assert.assertEquals(count, getFlowFilesForRelationship(relationship).size()); + } + + @Override + public void assertValid() { + context.assertValid(); + } + + @Override + public void assertNotValid() { + Assert.assertFalse("Processor appears to be valid but expected it to be invalid", context.isValid()); + } + + @Override + public boolean isQueueEmpty() { + return flowFileQueue.isEmpty(); + } + + @Override + public void assertQueueEmpty() { + Assert.assertTrue(flowFileQueue.isEmpty()); + } + + @Override + public void assertQueueNotEmpty() { + Assert.assertFalse(flowFileQueue.isEmpty()); + } + + @Override + public void clearTransferState() { + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + session.clearTransferState(); + } + } + + @Override + public void enqueue(final FlowFile... flowFiles) { + for (final FlowFile flowFile : flowFiles) { + flowFileQueue.offer((MockFlowFile) flowFile); + } + } + + @Override + public void enqueue(final Path path) throws IOException { + enqueue(path, new HashMap<String, String>()); + } + + @Override + public void enqueue(final Path path, final Map<String, String> attributes) throws IOException { + final Map<String, String> modifiedAttributes = new HashMap<>(attributes); + if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) { + modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName()); + } + try (final InputStream in = Files.newInputStream(path)) { + enqueue(in, modifiedAttributes); + } + } + + @Override + public void enqueue(final byte[] data) { + enqueue(data, new HashMap<String, String>()); + } + + @Override + public void enqueue(final byte[] data, final Map<String, String> attributes) { + enqueue(new ByteArrayInputStream(data), attributes); + } + + @Override + public void enqueue(final InputStream data) { + enqueue(data, new HashMap<String, String>()); + } + + @Override + public void enqueue(final InputStream data, final Map<String, String> attributes) { + final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor); + MockFlowFile flowFile = session.create(); + flowFile = session.importFrom(data, flowFile); + flowFile = session.putAllAttributes(flowFile, attributes); + enqueue(flowFile); + } + + @Override + public byte[] getContentAsByteArray(final MockFlowFile flowFile) { + return flowFile.getData(); + } + + @Override + public List<MockFlowFile> getFlowFilesForRelationship(final String relationship) { + final Relationship rel = new Relationship.Builder().name(relationship).build(); + return getFlowFilesForRelationship(rel); + } + + @Override + public List<MockFlowFile> getFlowFilesForRelationship(final Relationship relationship) { + final List<MockFlowFile> flowFiles = new ArrayList<>(); + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + flowFiles.addAll(session.getFlowFilesForRelationship(relationship)); + } + + Collections.sort(flowFiles, new Comparator<MockFlowFile>() { + @Override + public int compare(final MockFlowFile o1, final MockFlowFile o2) { + return Long.compare(o1.getCreationTime(), o2.getCreationTime()); + } + }); + + return flowFiles; + } + + /** + * @deprecated The ProvenanceReporter should not be accessed through the test runner, as it does not expose the events that were emitted. + */ + @Override + @Deprecated + public ProvenanceReporter getProvenanceReporter() { + return sharedState.getProvenanceReporter(); + } + + @Override + public QueueSize getQueueSize() { + return flowFileQueue.size(); + } + + @Override + public Long getCounterValue(final String name) { + return sharedState.getCounterValue(name); + } + + @Override + public int getRemovedCount() { + int count = 0; + for (final MockProcessSession session : sessionFactory.getCreatedSessions()) { + count += session.getRemovedCount(); + } + + return count; + } + + @Override + public void setAnnotationData(final String annotationData) { + context.setAnnotationData(annotationData); + } + + @Override + public ValidationResult setProperty(final String propertyName, final String propertyValue) { + return context.setProperty(propertyName, propertyValue); + } + + @Override + public ValidationResult setProperty(final PropertyDescriptor descriptor, final String value) { + return context.setProperty(descriptor, value); + } + + @Override + public ValidationResult setProperty(final PropertyDescriptor descriptor, final AllowableValue value) { + return context.setProperty(descriptor, value.getValue()); + } + + @Override + public void setThreadCount(final int threadCount) { + if (threadCount > 1 && triggerSerially) { + Assert.fail("Cannot set thread-count higher than 1 because the processor is triggered serially"); + } + + this.numThreads = threadCount; + } + + @Override + public int getThreadCount() { + return numThreads; + } + + @Override + public void setRelationshipAvailable(final Relationship relationship) { + final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); + unavailable.remove(relationship); + context.setUnavailableRelationships(unavailable); + } + + @Override + public void setRelationshipAvailable(final String relationshipName) { + setRelationshipAvailable(new Relationship.Builder().name(relationshipName).build()); + } + + @Override + public void setRelationshipUnavailable(final Relationship relationship) { + final Set<Relationship> unavailable = new HashSet<>(context.getUnavailableRelationships()); + unavailable.add(relationship); + context.setUnavailableRelationships(unavailable); + } + + @Override + public void setRelationshipUnavailable(final String relationshipName) { + setRelationshipUnavailable(new Relationship.Builder().name(relationshipName).build()); + } + + @Override + public void addControllerService(final String identifier, final ControllerService service) throws InitializationException { + addControllerService(identifier, service, new HashMap<String, String>()); + } + + @Override + public void addControllerService(final String identifier, final ControllerService service, final Map<String, String> properties) throws InitializationException { + // hold off on failing due to deprecated annotation for now... will introduce later. +// for ( final Method method : service.getClass().getMethods() ) { +// if ( method.isAnnotationPresent(org.apache.nifi.controller.annotation.OnConfigured.class) ) { +// Assert.fail("Controller Service " + service + " is using deprecated Annotation " + org.apache.nifi.controller.annotation.OnConfigured.class + " for method " + method); +// } +// } + + final ComponentLog logger = new MockProcessorLog(identifier, service); + final MockControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(requireNonNull(service), requireNonNull(identifier), logger); + service.initialize(initContext); + + final Map<PropertyDescriptor, String> resolvedProps = new HashMap<>(); + for (final Map.Entry<String, String> entry : properties.entrySet()) { + resolvedProps.put(service.getPropertyDescriptor(entry.getKey()), entry.getValue()); + } + + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, service); + } catch (final InvocationTargetException | IllegalAccessException | IllegalArgumentException e) { + throw new InitializationException(e); + } + + context.addControllerService(identifier, service, resolvedProps, null); + } + + @Override + public void assertNotValid(final ControllerService service) { + final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); + + for (final ValidationResult result : results) { + if (!result.isValid()) { + return; + } + } + + Assert.fail("Expected Controller Service " + service + " to be invalid but it is valid"); + } + + @Override + public void assertValid(final ControllerService service) { + final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final Collection<ValidationResult> results = context.getControllerService(service.getIdentifier()).validate(validationContext); + + for (final ValidationResult result : results) { + if (!result.isValid()) { + Assert.fail("Expected Controller Service to be valid but it is invalid due to: " + result.toString()); + } + } + } + + @Override + public void disableControllerService(final ControllerService service) { + final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); + if (configuration == null) { + throw new IllegalArgumentException("Controller Service " + service + " is not known"); + } + + if (!configuration.isEnabled()) { + throw new IllegalStateException("Controller service " + service + " cannot be disabled because it is not enabled"); + } + + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail("Failed to disable Controller Service " + service + " due to " + e); + } + + configuration.setEnabled(false); + } + + @Override + public void enableControllerService(final ControllerService service) { + final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); + if (configuration == null) { + throw new IllegalArgumentException("Controller Service " + service + " is not known"); + } + + if (configuration.isEnabled()) { + throw new IllegalStateException("Cannot enable Controller Service " + service + " because it is not disabled"); + } + + try { + final ConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context); + ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, configContext); + } catch (final InvocationTargetException ite) { + ite.getCause().printStackTrace(); + Assert.fail("Failed to enable Controller Service " + service + " due to " + ite.getCause()); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail("Failed to enable Controller Service " + service + " due to " + e); + } + + configuration.setEnabled(true); + } + + @Override + public boolean isControllerServiceEnabled(final ControllerService service) { + final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); + if (configuration == null) { + throw new IllegalArgumentException("Controller Service " + service + " is not known"); + } + + return configuration.isEnabled(); + } + + @Override + public void removeControllerService(final ControllerService service) { + disableControllerService(service); + + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, service); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail("Failed to remove Controller Service " + service + " due to " + e); + } + + context.removeControllerService(service); + } + + @Override + public void setAnnotationData(final ControllerService service, final String annotationData) { + final ControllerServiceConfiguration configuration = getConfigToUpdate(service); + configuration.setAnnotationData(annotationData); + } + + private ControllerServiceConfiguration getConfigToUpdate(final ControllerService service) { + final ControllerServiceConfiguration configuration = context.getConfiguration(service.getIdentifier()); + if (configuration == null) { + throw new IllegalArgumentException("Controller Service " + service + " is not known"); + } + + if (configuration.isEnabled()) { + throw new IllegalStateException("Controller service " + service + " cannot be modified because it is not disabled"); + } + + return configuration; + } + + @Override + public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final AllowableValue value) { + return setProperty(service, property, value.getValue()); + } + + @Override + public ValidationResult setProperty(final ControllerService service, final PropertyDescriptor property, final String value) { + final ControllerServiceConfiguration configuration = getConfigToUpdate(service); + final Map<PropertyDescriptor, String> curProps = configuration.getProperties(); + final Map<PropertyDescriptor, String> updatedProps = new HashMap<>(curProps); + + final ValidationContext validationContext = new MockValidationContext(context).getControllerServiceValidationContext(service); + final ValidationResult validationResult = property.validate(value, validationContext); + + updatedProps.put(property, value); + configuration.setProperties(updatedProps); + + return validationResult; + } + + @Override + public ValidationResult setProperty(final ControllerService service, final String propertyName, final String value) { + final PropertyDescriptor descriptor = service.getPropertyDescriptor(propertyName); + if (descriptor == null) { + return new ValidationResult.Builder() + .input(propertyName) + .explanation(propertyName + " is not a known Property for Controller Service " + service) + .subject("Invalid property") + .valid(false) + .build(); + } + return setProperty(service, descriptor, value); + } + + @Override + public ControllerService getControllerService(final String identifier) { + return context.getControllerService(identifier); + } + + @Override + public <T extends ControllerService> T getControllerService(final String identifier, final Class<T> serviceType) { + final ControllerService service = context.getControllerService(identifier); + return serviceType.cast(service); + } + + @Override + public boolean removeProperty(PropertyDescriptor descriptor) { + return context.removeProperty(descriptor); + } + + @Override + public List<ProvenanceEventRecord> getProvenanceEvents() { + return sharedState.getProvenanceEvents(); + } + + @Override + public void clearProvenanceEvents() { + sharedState.clearProvenanceEvents(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java new file mode 100644 index 0000000..fb9fc78 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -0,0 +1,765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.QueueSize; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceReporter; +import org.apache.nifi.reporting.InitializationException; + +public interface TestRunner { + + /** + * @return the {@link Processor} for which this <code>TestRunner</code> is + * configured + */ + Processor getProcessor(); + + /** + * @return the {@link ProcessSessionFactory} that this + * <code>TestRunner</code> will use to invoke the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} method + */ + ProcessSessionFactory getProcessSessionFactory(); + + /** + * @return the {@Link ProcessContext} that this <code>TestRunner</code> will + * use to invoke the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method + */ + ProcessContext getProcessContext(); + + /** + * Performs exactly the same operation as calling {@link #run(int)} with a + * value of 1. + */ + void run(); + + /** + * Performs the same operation as calling {@link #run(int, boolean)} with a + * value of <code>true</code> + * + * @param iterations number of iterations + */ + void run(int iterations); + + /** + * performs the same operation as calling {@link #run(int, boolean, int)} + * with a value of {@code iterations}, {@code stopOnFinish}, {@code true} + * + * @param iterations number of iterations + * @param stopOnFinish flag to stop when finished + */ + void run(int iterations, boolean stopOnFinish); + + /** + * This method runs the {@link Processor} <code>iterations</code> times, + * using the sequence of steps below: + * <ul> + * <li> + * If {@code initialize} is true, run all methods on the Processor that are + * annotated with the + * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If + * any of these methods throws an Exception, the Unit Test will fail. + * </li> + * <li> + * Schedule the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method to be invoked <code>iterations</code> times. The number of threads + * used to run these iterations is determined by the ThreadCount of this + * <code>TestRunner</code>. By default, the value is set to 1, but it can be + * modified by calling the {@link #setThreadCount(int)} method. + * </li> + * <li> + * As soon as the first thread finishes its execution of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}, + * all methods on the Processor that are annotated with the + * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + * are invoked. If any of these methods throws an Exception, the Unit Test + * will fail. + * </li> + * <li> + * Waits for all threads to finish execution. + * </li> + * <li> + * If and only if the value of <code>shutdown</code> is true: Call all + * methods on the Processor that is annotated with the + * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation. + * </li> + * </ul> + * + * @param iterations number of iterations + * @param stopOnFinish whether or not to run the Processor methods that are + * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped} + * @param initialize true if must initialize + */ + void run(int iterations, boolean stopOnFinish, final boolean initialize); + + /** + * This method runs the {@link Processor} <code>iterations</code> times, + * using the sequence of steps below: + * <ul> + * <li> + * If {@code initialize} is true, run all methods on the Processor that are + * annotated with the + * {@link nifi.processor.annotation.OnScheduled @OnScheduled} annotation. If + * any of these methods throws an Exception, the Unit Test will fail. + * </li> + * <li> + * Schedule the + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method to be invoked <code>iterations</code> times. The number of threads + * used to run these iterations is determined by the ThreadCount of this + * <code>TestRunner</code>. By default, the value is set to 1, but it can be + * modified by calling the {@link #setThreadCount(int)} method. + * </li> + * <li> + * As soon as the first thread finishes its execution of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger}, + * all methods on the Processor that are annotated with the + * {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + * are invoked. If any of these methods throws an Exception, the Unit Test + * will fail. + * </li> + * <li> + * Waits for all threads to finish execution. + * </li> + * <li> + * If and only if the value of <code>shutdown</code> is true: Call all + * methods on the Processor that is annotated with the + * {@link nifi.processor.annotation.OnStopped @OnStopped} annotation. + * </li> + * </ul> + * + * @param iterations number of iterations + * @param stopOnFinish whether or not to run the Processor methods that are + * annotated with {@link nifi.processor.annotation.OnStopped @OnStopped} + * @param initialize true if must initialize + * @param runWait indicates the amount of time in milliseconds that the framework should wait for + * processors to stop running before calling the {@link nifi.processor.annotation.OnUnscheduled @OnUnscheduled} annotation + */ + void run(int iterations, boolean stopOnFinish, final boolean initialize, final long runWait); + + /** + * Invokes all methods on the Processor that are annotated with the + * {@link nifi.processor.annotation.OnShutdown @OnShutdown} annotation. If + * any of these methods throws an Exception, the Unit Test will fail + */ + void shutdown(); + + /** + * Updates the number of threads that will be used to run the Processor when + * calling the {@link #run()} or {@link #run(int)} methods. + * + * @param threadCount num threads + */ + void setThreadCount(int threadCount); + + /** + * @return the currently configured number of threads that will be used to + * runt he Processor when calling the {@link #run()} or {@link #run(int)} + * methods + */ + int getThreadCount(); + + /** + * Updates the value of the property with the given PropertyDescriptor to + * the specified value IF and ONLY IF the value is valid according to the + * descriptor's validator. Otherwise, Assert.fail() is called, causing the + * unit test to fail + * + * @param propertyName name + * @param propertyValue value + * @return result + */ + ValidationResult setProperty(String propertyName, String propertyValue); + + /** + * Updates the value of the property with the given PropertyDescriptor to + * the specified value IF and ONLY IF the value is valid according to the + * descriptor's validator. Otherwise, Assert.fail() is called, causing the + * unit test to fail + * + * @param descriptor descriptor + * @param value value + * @return result + */ + ValidationResult setProperty(PropertyDescriptor descriptor, String value); + + /** + * Updates the value of the property with the given PropertyDescriptor to + * the specified value IF and ONLY IF the value is valid according to the + * descriptor's validator. Otherwise, Assert.fail() is called, causing the + * unit test to fail + * + * @param descriptor descriptor + * @param value allowable valu + * @return result + */ + ValidationResult setProperty(PropertyDescriptor descriptor, AllowableValue value); + + /** + * Sets the annotation data. + * + * @param annotationData data + */ + void setAnnotationData(String annotationData); + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship to verify + */ + void assertAllFlowFilesTransferred(String relationship); + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship + * + * @param relationship to verify + */ + void assertAllFlowFilesTransferred(Relationship relationship); + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship to verify + * @param count number of expected transfers + */ + void assertAllFlowFilesTransferred(String relationship, int count); + + /** + * Asserts that all FlowFiles that were transferred were transferred to the + * given relationship and that the number of FlowFiles transferred is equal + * to <code>count</code> + * + * @param relationship to verify + * @param count number of expected transfers + */ + void assertAllFlowFilesTransferred(Relationship relationship, int count); + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship to verify + * @param count number of expected transfers + */ + void assertTransferCount(Relationship relationship, int count); + + /** + * Assert that the number of FlowFiles transferred to the given relationship + * is equal to the given count + * + * @param relationship to verify + * @param count number of expected transfers + */ + void assertTransferCount(String relationship, int count); + + /** + * Assert that there are no FlowFiles left on the input queue. + */ + void assertQueueEmpty(); + + /** + * @return <code>true</code> if the Input Queue to the Processor is empty, + * <code>false</code> otherwise + */ + boolean isQueueEmpty(); + + /** + * Assert that there is at least one FlowFile left on the input queue. + */ + void assertQueueNotEmpty(); + + /** + * Assert that the currently configured set of properties/annotation data + * are valid + */ + void assertValid(); + + /** + * Assert that the currently configured set of properties/annotation data + * are NOT valid + */ + void assertNotValid(); + + /** + * Resets the Transfer Counts that indicate how many FlowFiles have been + * transferred to each Relationship and removes from memory any FlowFiles + * that have been transferred to this Relationships. This method should be + * called between successive calls to {@link #run(int) run} if the output is + * to be examined after each run. + */ + void clearTransferState(); + + /** + * Enqueues the given FlowFiles into the Processor's input queue + * + * @param flowFiles to enqueue + */ + void enqueue(FlowFile... flowFiles); + + /** + * Reads the content from the given {@link Path} into memory and creates a + * FlowFile from this content with no attributes and adds this FlowFile to + * the Processor's Input Queue + * + * @param path to read content from + * @throws IOException if unable to read content + */ + void enqueue(Path path) throws IOException; + + /** + * Reads the content from the given {@link Path} into memory and creates a + * FlowFile from this content with the given attributes and adds this + * FlowFile to the Processor's Input Queue + * + * @param path to read content from + * @param attributes attributes to use for new flow file + * @throws IOException if unable to read content + */ + void enqueue(Path path, Map<String, String> attributes) throws IOException; + + /** + * Copies the content from the given byte array into memory and creates a + * FlowFile from this content with no attributes and adds this FlowFile to + * the Processor's Input Queue + * + * @param data to enqueue + */ + void enqueue(byte[] data); + + /** + * Copies the content from the given byte array into memory and creates a + * FlowFile from this content with the given attributes and adds this + * FlowFile to the Processor's Input Queue + * + * @param data to enqueue + * @param attributes to use for enqueued items + */ + void enqueue(byte[] data, Map<String, String> attributes); + + /** + * Reads the content from the given {@link InputStream} into memory and + * creates a FlowFile from this content with no attributes and adds this + * FlowFile to the Processor's Input Queue + * + * @param data to source data from + */ + void enqueue(InputStream data); + + /** + * Reads the content from the given {@link InputStream} into memory and + * creates a FlowFile from this content with the given attributes and adds + * this FlowFile to the Processor's Input Queue + * + * @param data source of data + * @param attributes to use for flow files + */ + void enqueue(InputStream data, Map<String, String> attributes); + + /** + * Copies the contents of the given {@link MockFlowFile} into a byte array + * and returns that byte array. + * + * @param flowFile to get content for + * @return byte array of flowfile content + */ + byte[] getContentAsByteArray(MockFlowFile flowFile); + + /** + * Returns a List of FlowFiles in the order in which they were transferred + * to the given relationship + * + * @param relationship to get flowfiles for + * @return flowfiles transfered to given relationship + */ + List<MockFlowFile> getFlowFilesForRelationship(String relationship); + + /** + * Returns a List of FlowFiles in the order in which they were transferred + * to the given relationship + * + * @param relationship to get flowfiles for + * @return flowfiles transfered to given relationship + */ + List<MockFlowFile> getFlowFilesForRelationship(Relationship relationship); + + /** + * @return the {@link ProvenanceReporter} that will be used by the + * configured {@link Processor} for reporting Provenance Events + */ + ProvenanceReporter getProvenanceReporter(); + + /** + * @return the current size of the Processor's Input Queue + */ + QueueSize getQueueSize(); + + /** + * @param name of counter + * @return the current value of the counter with the specified name, or null + * if no counter exists with the specified name + */ + Long getCounterValue(String name); + + /** + * @return the number of FlowFiles that have been removed from the system + */ + int getRemovedCount(); + + /** + * Indicates to the Framework that the given Relationship should be + * considered "available", meaning that the queues of all Connections that + * contain this Relationship are not full. This is generally used only when + * dealing with Processors that use the + * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable} + * annotation. + * + * @param relationship to mark as available + */ + void setRelationshipAvailable(Relationship relationship); + + /** + * Indicates to the Framework that the given Relationship with the given + * name should be considered "available", meaning that the queues of all + * Connections that contain this Relationship are not full. This is + * generally used only when dealing with Processors that use the + * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable} + * + * @param relationshipName relationship name + */ + void setRelationshipAvailable(String relationshipName); + + /** + * Indicates to the Framework that the given Relationship should NOT be + * considered "available", meaning that the queue of at least one Connection + * that contain this Relationship is full. This is generally used only when + * dealing with Processors that use the + * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable} + * annotation. + * + * @param relationship to mark as unavailable + */ + void setRelationshipUnavailable(Relationship relationship); + + /** + * Indicates to the Framework that the Relationship with the given name + * should NOT be considered "available", meaning that the queue of at least + * one Connection that contain this Relationship is full. This is generally + * used only when dealing with Processors that use the + * {@link nifi.processor.annotation.TriggerWhenAnyDestinationAvailable} + * + * @param relationshipName name of relationship. + */ + void setRelationshipUnavailable(String relationshipName); + + /** + * Adds the given {@link ControllerService} to this TestRunner so that the + * configured Processor can access it using the given + * <code>identifier</code>. The ControllerService is not expected to be + * initialized, as the framework will create the appropriate + * {@link nifi.controller.ControllerServiceInitializationContext ControllerServiceInitializationContext} + * and initialize the ControllerService with no specified properties. + * + * This will call any method on the given Controller Service that is + * annotated with the + * {@link org.apache.nifi.annotation.lifecycle.OnAdded @OnAdded} annotation. + * + * @param identifier of service + * @param service the service + * @throws InitializationException ie + */ + void addControllerService(String identifier, ControllerService service) throws InitializationException; + + /** + * Adds the given {@link ControllerService} to this TestRunner so that the + * configured Processor can access it using the given + * <code>identifier</code>. The ControllerService is not expected to be + * initialized, as the framework will create the appropriate + * {@link nifi.controller.ControllerServiceInitializationContext ControllerServiceInitializationContext} + * and initialize the ControllerService with the given properties. + * + * This will call any method on the given Controller Service that is + * annotated with the + * {@link org.apache.nifi.annotation.lifecycle.OnAdded @OnAdded} annotation. + * + * @param identifier of service + * @param service the service + * @param properties service properties + * @throws InitializationException ie + */ + void addControllerService(String identifier, ControllerService service, Map<String, String> properties) throws InitializationException; + + /** + * <p> + * Marks the Controller Service as enabled so that it can be used by other + * components. + * </p> + * + * <p> + * This method will result in calling any method in the Controller Service + * that is annotated with the + * {@link org.apache.nifi.annotation.lifecycle.OnEnabled @OnEnabled} + * annotation. + * </p> + * + * @param service the service to enable + */ + void enableControllerService(ControllerService service); + + /** + * <p> + * Marks the Controller Service as disabled so that it cannot be used by + * other components. + * </p> + * + * <p> + * This method will result in calling any method in the Controller Service + * that is annotated with the + * {@link org.apache.nifi.annotation.lifecycle.OnDisabled @OnDisabled} + * annotation. + * </p> + * + * @param service the service to disable + */ + void disableControllerService(ControllerService service); + + /** + * @param service the service + * @return {@code true} if the given Controller Service is enabled, + * {@code false} if it is disabled + * + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + */ + boolean isControllerServiceEnabled(ControllerService service); + + /** + * <p> + * Removes the Controller Service from the TestRunner. This will call any + * method on the ControllerService that is annotated with the + * {@link org.apache.nifi.annotation.lifecycle.OnRemoved @OnRemoved} + * annotation. + * </p> + * + * @param service the service + * + * @throws IllegalStateException if the ControllerService is not disabled + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + * + */ + void removeControllerService(ControllerService service); + + /** + * Sets the given property on the given ControllerService + * + * @param service to modify + * @param property to modify + * @param value value to use + * @return result + * + * @throws IllegalStateException if the ControllerService is not disabled + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + * + */ + ValidationResult setProperty(ControllerService service, PropertyDescriptor property, String value); + + /** + * Sets the given property on the given ControllerService + * + * @param service to modify + * @param property to modify + * @param value value to use + * @return result + * + * @throws IllegalStateException if the ControllerService is not disabled + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + * + */ + ValidationResult setProperty(ControllerService service, PropertyDescriptor property, AllowableValue value); + + /** + * Sets the property with the given name on the given ControllerService + * + * @param service to modify + * @param propertyName to modify + * @param value value to use + * @return result + * + * @throws IllegalStateException if the ControllerService is not disabled + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + * + */ + ValidationResult setProperty(ControllerService service, String propertyName, String value); + + /** + * Sets the annontation data of the given service to the provided annotation + * data. + * + * @param service to modify + * @param annotationData the data + * + * @throws IllegalStateException if the Controller Service is not disabled + * + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + */ + void setAnnotationData(ControllerService service, String annotationData); + + /** + * @param identifier of controller service + * @return the {@link ControllerService} that is registered with the given + * identifier, or <code>null</code> if no Controller Service exists with the + * given identifier + */ + ControllerService getControllerService(String identifier); + + /** + * Assert that the currently configured set of properties/annotation data + * are valid for the given Controller Service. + * + * @param service the service to validate + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + */ + void assertValid(ControllerService service); + + /** + * Assert that the currently configured set of properties/annotation data + * are NOT valid for the given Controller Service. + * + * @param service the service to validate + * @throws IllegalArgumentException if the given ControllerService is not + * known by this TestRunner (i.e., it has not been added via the + * {@link #addControllerService(String, ControllerService)} or + * {@link #addControllerService(String, ControllerService, Map)} method or + * if the Controller Service has been removed via the + * {@link #removeControllerService(ControllerService)} method. + * + */ + void assertNotValid(ControllerService service); + + /** + * @param <T> type of service + * @param identifier identifier of service + * @param serviceType type of service + * @return the {@link ControllerService} that is registered with the given + * identifier, cast as the provided service type, or <code>null</code> if no + * Controller Service exists with the given identifier + * + * @throws ClassCastException if the identifier given is registered for a + * Controller Service but that Controller Service is not of the type + * specified + */ + <T extends ControllerService> T getControllerService(String identifier, Class<T> serviceType); + + /** + * Specifies whether or not the TestRunner will validate the use of + * Expression Language. By default, the value is <code>true</code>, which + * means that an Exception will be thrown if the Processor attempts to + * obtain the configured value of a Property without calling + * {@link nifi.components.PropertyValue#evaluateAttributeExpressions evaluateAttributeExpressions} + * on the Property Value or if + * {@link nifi.components.PropertyValue#evaluateAttributeExpressions evaluateAttributeExpressions} + * is called but the PropertyDescriptor indicates that the Expression + * Language is not supported. + * + * <p> + * <b>See Also: + * </b>{@link PropertyDescriptor.Builder#expressionLanguageSupported(boolean)} + * </p> + * + * @param validate whether there is any need to validate the EL was used + */ + void setValidateExpressionUsage(boolean validate); + + /** + * Removes the {@link PropertyDescriptor} from the {@link ProcessContext}, + * effectively setting its value to null. + * + * @param descriptor of property to remove + * @return true if removed + */ + boolean removeProperty(PropertyDescriptor descriptor); + + /** + * Returns a {@link List} of all {@link ProvenanceEventRecord}s that were + * emitted by the Processor + * + * @return a List of all Provenance Events that were emitted by the + * Processor + */ + List<ProvenanceEventRecord> getProvenanceEvents(); + + /** + * Clears the Provenance Events that have been emitted by the Processor + */ + void clearProvenanceEvents(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java new file mode 100644 index 0000000..f2b0b23 --- /dev/null +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.Processor; + +public class TestRunners { + + public static TestRunner newTestRunner(final Processor processor) { + return new StandardProcessorTestRunner(processor); + } + + public static TestRunner newTestRunner(final Class<? extends Processor> processorClass) { + try { + return newTestRunner(processorClass.newInstance()); + + } catch (final Exception e) { + System.err.println("Could not instantiate instance of class " + processorClass.getName() + " due to: " + e); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java new file mode 100644 index 0000000..6b403af --- /dev/null +++ b/nifi-mock/src/test/java/org/apache/nifi/util/CurrentTestStandardProcessorTestRunner.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Assert; +import org.junit.Test; + +public class CurrentTestStandardProcessorTestRunner { + + /** + * This test will verify that all iterations of the run are finished before unscheduled is called + */ + @Test + public void testOnScheduledCalledAfterRunFinished() { + SlowRunProcessor processor = new SlowRunProcessor(); + StandardProcessorTestRunner runner = new StandardProcessorTestRunner(processor); + final int iterations = 5; + runner.run(iterations); + // if the counter is not equal to iterations, the the processor must have been unscheduled + // before all the run calls were made, that would be bad. + Assert.assertEquals(iterations, processor.getCounter()); + } + + /** + * This processor simulates a "slow" processor that checks whether it is scheduled before doing something + * + * + */ + private static class SlowRunProcessor extends AbstractProcessor { + + private int counter = 0; + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + try { + // be slow + Thread.sleep(50); + // make sure we are still scheduled + if (isScheduled()) { + // increment counter + ++counter; + } + } catch (InterruptedException e) { + } + + } + + public int getCounter() { + return counter; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java new file mode 100644 index 0000000..323a357 --- /dev/null +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestStandardProcessorTestRunner.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("This should not be enabled until we actually fail processor unit tests for using deprecated methods, which should happen in 0.1.0") +public class TestStandardProcessorTestRunner { + + @Test(expected = AssertionError.class) + public void testFailOnDeprecatedTypeAnnotation() { + new StandardProcessorTestRunner(new DeprecatedAnnotation()); + } + + @Test + public void testDoesNotFailOnNonDeprecatedTypeAnnotation() { + new StandardProcessorTestRunner(new NewAnnotation()); + } + + @Test(expected = AssertionError.class) + public void testFailOnDeprecatedMethodAnnotation() { + new StandardProcessorTestRunner(new DeprecatedMethodAnnotation()); + } + + @Test + public void testDoesNotFailOnNonDeprecatedMethodAnnotation() { + new StandardProcessorTestRunner(new NewMethodAnnotation()); + } + + @SuppressWarnings("deprecation") + @org.apache.nifi.processor.annotation.Tags({"deprecated"}) + private static class DeprecatedAnnotation extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + } + + @org.apache.nifi.annotation.documentation.Tags({"deprecated"}) + private static class NewAnnotation extends AbstractProcessor { + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + } + + private static class NewMethodAnnotation extends AbstractProcessor { + + @org.apache.nifi.annotation.lifecycle.OnScheduled + public void dummy() { + + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + } + + private static class DeprecatedMethodAnnotation extends AbstractProcessor { + + @SuppressWarnings("deprecation") + @org.apache.nifi.processor.annotation.OnScheduled + public void dummy() { + + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + } +}