http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java new file mode 100644 index 0000000..da242b8 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; + +import java.util.Collection; + +/** + * ProvenanceReporter generates and records Provenance-related events. A + * ProvenanceReporter is always tied to a {@link ProcessSession}. Any events + * that are generated are reported to Provenance only after the session has been + * committed. If the session is rolled back, the events related to that session + * are purged. + */ +public interface ProvenanceReporter { + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#RECEIVE RECEIVE} that indicates that the given + * FlowFile was created from data received from an external source. + * + * @param flowFile the FlowFile that was received + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + */ + void receive(FlowFile flowFile, String transitUri); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#RECEIVE RECEIVE} that indicates that the given + * FlowFile was created from data received from the specified URI and that + * the source system used the specified identifier (a URI with namespace) to + * refer to the data. + * + * @param flowFile the FlowFile that was received + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param sourceSystemFlowFileIdentifier the URI/identifier that the source + * system uses to refer to the data; if this value is non-null and is not a + * URI, the prefix "urn:tdo:" will be used to form a URI. + */ + void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#RECEIVE RECEIVE} that indicates that the given + * FlowFile was created from data received from an external source. + * + * @param flowFile the FlowFile that was received + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param transmissionMillis the number of milliseconds taken to transfer + * the data + */ + void receive(FlowFile flowFile, String transitUri, long transmissionMillis); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#RECEIVE RECEIVE} that indicates that the given + * FlowFile was created from data received from an external source and + * provides additional details about the receipt of the FlowFile, such as a + * remote system's Distinguished Name. + * + * @param flowFile the FlowFile that was received + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param details details about the receive event; for example, it may be + * relevant to include the DN of the sending system + * @param transmissionMillis the number of milliseconds taken to transfer + * the data + */ + void receive(FlowFile flowFile, String transitUri, String details, long transmissionMillis); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#RECEIVE RECEIVE} that indicates that the given + * FlowFile was created from data received from an external source and + * provides additional details about the receipt of the FlowFile, such as a + * remote system's Distinguished Name. + * + * @param flowFile the FlowFile that was received + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param sourceSystemFlowFileIdentifier the URI/identifier that the source + * system uses to refer to the data; if this value is non-null and is not a + * URI, the prefix "urn:tdo:" will be used to form a URI. + * @param details details about the receive event; for example, it may be + * relevant to include the DN of the sending system + * @param transmissionMillis the number of milliseconds taken to transfer + * the data + */ + void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + */ + void send(FlowFile flowFile, String transitUri); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param details additional details related to the SEND event, such as a + * remote system's Distinguished Name + */ + void send(FlowFile flowFile, String transitUri, String details); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param transmissionMillis the number of milliseconds spent sending the + * data to the remote system + */ + void send(FlowFile flowFile, String transitUri, long transmissionMillis); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param details additional details related to the SEND event, such as a + * remote system's Distinguished Name + * @param transmissionMillis the number of milliseconds spent sending the + * data to the remote system + */ + void send(FlowFile flowFile, String transitUri, String details, long transmissionMillis); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param force if <code>true</code>, this event will be added to the + * Provenance Repository immediately and will still be persisted if the + * {@link nifi.processor.ProcessSession ProcessSession} to which this + * ProvenanceReporter is associated is rolled back. Otherwise, the Event + * will be recorded only on a successful session commit. + */ + void send(FlowFile flowFile, String transitUri, boolean force); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param details additional details related to the SEND event, such as a + * remote system's Distinguished Name + * @param force if <code>true</code>, this event will be added to the + * Provenance Repository immediately and will still be persisted if the + * {@link nifi.processor.ProcessSession ProcessSession} to which this + * ProvenanceReporter is associated is rolled back. Otherwise, the Event + * will be recorded only on a successful session commit. + */ + void send(FlowFile flowFile, String transitUri, String details, boolean force); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param transmissionMillis the number of milliseconds spent sending the + * data to the remote system + * @param force if <code>true</code>, this event will be added to the + * Provenance Repository immediately and will still be persisted if the + * {@link nifi.processor.ProcessSession ProcessSession} to which this + * ProvenanceReporter is associated is rolled back. Otherwise, the Event + * will be recorded only on a successful session commit. + */ + void send(FlowFile flowFile, String transitUri, long transmissionMillis, boolean force); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND} + * that indicates that a copy of the given FlowFile was sent to an external + * destination. The external destination may be a remote system or may be a + * local destination, such as the local file system but is external to NiFi. + * + * @param flowFile the FlowFile that was sent + * @param transitUri A URI that provides information about the System and + * Protocol information over which the transfer occurred. The intent of this + * field is such that both the sender and the receiver can publish the + * events to an external Enterprise-wide system that is then able to + * correlate the SEND and RECEIVE events. + * @param details additional details related to the SEND event, such as a + * remote system's Distinguished Name + * @param transmissionMillis the number of milliseconds spent sending the + * data to the remote system + * @param force if <code>true</code>, this event will be added to the + * Provenance Repository immediately and will still be persisted if the + * {@link nifi.processor.ProcessSession ProcessSession} to which this + * ProvenanceReporter is associated is rolled back. Otherwise, the Event + * will be recorded only on a successful session commit. + */ + void send(FlowFile flowFile, String transitUri, String details, long transmissionMillis, boolean force); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#ADDINFO ADDINFO} that provides a linkage + * between the given FlowFile and alternate identifier. This information can + * be useful if published to an external, enterprise-wide Provenance + * tracking system that is able to associate the data between different + * processes. + * + * @param flowFile the FlowFile for which the association should be made + * @param alternateIdentifierNamespace the namespace of the alternate system + * @param alternateIdentifier the identifier that the alternate system uses + * when referring to the data that is encompassed by this FlowFile + */ + void associate(FlowFile flowFile, String alternateIdentifierNamespace, String alternateIdentifier); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#FORK FORK} + * that establishes that the given parent was split into multiple child + * FlowFiles. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parent the FlowFile from which the children are derived + * @param children the FlowFiles that are derived from the parent. + */ + void fork(FlowFile parent, Collection<FlowFile> children); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#FORK FORK} + * that establishes that the given parent was split into multiple child + * FlowFiles. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parent the FlowFile from which the children are derived + * @param children the FlowFiles that are derived from the parent. + * @param details any details pertinent to the fork + */ + void fork(FlowFile parent, Collection<FlowFile> children, String details); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#FORK FORK} + * that establishes that the given parent was split into multiple child + * FlowFiles. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parent the FlowFile from which the children are derived + * @param children the FlowFiles that are derived from the parent. + * @param forkDuration the number of milliseconds that it took to perform + * the task + */ + void fork(FlowFile parent, Collection<FlowFile> children, long forkDuration); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#FORK FORK} + * that establishes that the given parent was split into multiple child + * FlowFiles. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parent the FlowFile from which the children are derived + * @param children the FlowFiles that are derived from the parent. + * @param details any details pertinent to the fork + * @param forkDuration the number of milliseconds that it took to perform + * the task + */ + void fork(FlowFile parent, Collection<FlowFile> children, String details, long forkDuration); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#JOIN JOIN} + * that establishes that the given parents were joined together to create a + * new child FlowFile. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parents the FlowFiles that are being joined together to create the + * child + * @param child the FlowFile that is being created by joining the parents + */ + void join(Collection<FlowFile> parents, FlowFile child); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#JOIN JOIN} + * that establishes that the given parents were joined together to create a + * new child FlowFile. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parents the FlowFiles that are being joined together to create the + * child + * @param child the FlowFile that is being created by joining the parents + * @param details any details pertinent to the event + */ + void join(Collection<FlowFile> parents, FlowFile child, String details); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#JOIN JOIN} + * that establishes that the given parents were joined together to create a + * new child FlowFile. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parents the FlowFiles that are being joined together to create the + * child + * @param child the FlowFile that is being created by joining the parents + * @param joinDuration the number of milliseconds that it took to join the + * FlowFiles + */ + void join(Collection<FlowFile> parents, FlowFile child, long joinDuration); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#JOIN JOIN} + * that establishes that the given parents were joined together to create a + * new child FlowFile. In general, this method does not need to be called by + * Processors, as the ProcessSession will handle this automatically for you + * when calling {@link ProcessSession#create(FlowFile)}. + * + * @param parents the FlowFiles that are being joined together to create the + * child + * @param child the FlowFile that is being created by joining the parents + * @param details any details pertinent to the event + * @param joinDuration the number of milliseconds that it took to join the + * FlowFiles + */ + void join(Collection<FlowFile> parents, FlowFile child, String details, long joinDuration); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#CLONE CLONE} + * that establishes that the given child is an exact replica of the parent. + * In general, this method does not need to be called by Processors, as the + * {@link ProcessSession} will handle this automatically for you when + * calling {@link ProcessSession#clone(FlowFile)} + * + * @param parent the FlowFile that was cloned + * @param child the clone + */ + void clone(FlowFile parent, FlowFile child); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CONTENT_MODIFIED CONTENT_MODIFIED} that + * indicates that the content of the given FlowFile has been modified. One + * of the <code>modifyContent</code> methods should be called any time that + * the contents of a FlowFile are modified. + * + * @param flowFile the FlowFile whose content is being modified + */ + void modifyContent(FlowFile flowFile); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CONTENT_MODIFIED CONTENT_MODIFIED} that + * indicates that the content of the given FlowFile has been modified. One + * of the <code>modifyContent</code> methods should be called any time that + * the contents of a FlowFile are modified. + * + * @param flowFile the FlowFile whose content is being modified + * @param details Any details about how the content of the FlowFile has been + * modified. Details should not be specified if they can be inferred by + * other information in the event, such as the name of the Processor, as + * specifying this information will add undue overhead + */ + void modifyContent(FlowFile flowFile, String details); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CONTENT_MODIFIED CONTENT_MODIFIED} that + * indicates that the content of the given FlowFile has been modified. One + * of the <code>modifyContent</code> methods should be called any time that + * the contents of a FlowFile are modified. + * + * @param flowFile the FlowFile whose content is being modified + * @param processingMillis the number of milliseconds spent processing the + * FlowFile + */ + void modifyContent(FlowFile flowFile, long processingMillis); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CONTENT_MODIFIED CONTENT_MODIFIED} that + * indicates that the content of the given FlowFile has been modified. One + * of the <code>modifyContent</code> methods should be called any time that + * the contents of a FlowFile are modified. + * + * @param flowFile the FlowFile whose content is being modified + * @param details Any details about how the content of the FlowFile has been + * modified. Details should not be specified if they can be inferred by + * other information in the event, such as the name of the Processor, as + * specifying this information will add undue overhead + * @param processingMillis the number of milliseconds spent processing the + * FlowFile + */ + void modifyContent(FlowFile flowFile, String details, long processingMillis); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#ATTRIBUTES_MODIFIED ATTRIBUTES_MODIFIED} that + * indicates that the Attributes of the given FlowFile were updated. It is + * not necessary to emit such an event for a FlowFile if other Events are + * already emitted by a Processor. For example, one should call both + * {@link #modifyContent(FlowFile)} and {@link #modifyAttributes(FlowFile)} + * for the same FlowFile in the same Processor. Rather, the Processor should + * call just the {@link #modifyContent(FlowFile)}, as the call to + * {@link #modifyContent(FlowFile)} will generate a Provenance Event that + * already contains all FlowFile attributes. As such, emitting another event + * that contains those attributes is unneeded and can result in a + * significant amount of overhead for storage and processing. + * + * @param flowFile the FlowFile whose attributes were modified + */ + void modifyAttributes(FlowFile flowFile); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#ATTRIBUTES_MODIFIED ATTRIBUTES_MODIFIED} that + * indicates that the Attributes of the given FlowFile were updated. It is + * not necessary to emit such an event for a FlowFile if other Events are + * already emitted by a Processor. For example, one should call both + * {@link #modifyContent(FlowFile)} and {@link #modifyAttributes(FlowFile)} + * for the same FlowFile in the same Processor. Rather, the Processor should + * call just the {@link #modifyContent(FlowFile)}, as the call to + * {@link #modifyContent(FlowFile)} will generate a Provenance Event that + * already contains all FlowFile attributes. As such, emitting another event + * that contains those attributes is unneeded and can result in a + * significant amount of overhead for storage and processing. + * + * @param flowFile the FlowFile whose attributes were modified + * @param details any details should be provided about the attribute + * modification + */ + void modifyAttributes(FlowFile flowFile, String details); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#ROUTE ROUTE} + * that indicates that the given FlowFile was routed to the given + * {@link Relationship}. <b>Note: </b> this Event is intended for Processors + * whose sole job it is to route FlowFiles and should NOT be used as a way + * to indicate that the given FlowFile was routed to a standard 'success' or + * 'failure' relationship. Doing so can be problematic, as DataFlow Managers + * often will loop 'failure' relationships back to the same processor. As + * such, emitting a Route event to indicate that a FlowFile was routed to + * 'failure' can result in creating thousands of Provenance Events for a + * given FlowFile, resulting in a very difficult-to- understand lineage. + * + * @param flowFile the FlowFile being routed + * @param relationship the Relationship to which the FlowFile was routed + */ + void route(FlowFile flowFile, Relationship relationship); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#ROUTE ROUTE} + * that indicates that the given FlowFile was routed to the given + * {@link Relationship}. <b>Note: </b> this Event is intended ONLY for + * Processors whose sole job it is to route FlowFiles and should NOT be used + * as a way to indicate that hte given FlowFile was routed to a standard + * 'success' or 'failure' relationship. Doing so can be problematic, as + * DataFlow Managers often will loop 'failure' relationships back to the + * same processor. As such, emitting a Route event to indicate that a + * FlowFile was routed to 'failure' can result in creating thousands of + * Provenance Events for a given FlowFile, resulting in a very difficult-to- + * understand lineage. + * + * @param flowFile the FlowFile being routed + * @param relationship the Relationship to which the FlowFile was routed + * @param details any details pertinent to the Route event, such as why the + * FlowFile was routed to the specified Relationship + */ + void route(FlowFile flowFile, Relationship relationship, String details); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#ROUTE ROUTE} + * that indicates that the given FlowFile was routed to the given + * {@link Relationship}. <b>Note: </b> this Event is intended ONLY for + * Processors whose sole job it is to route FlowFiles and should NOT be used + * as a way to indicate that hte given FlowFile was routed to a standard + * 'success' or 'failure' relationship. Doing so can be problematic, as + * DataFlow Managers often will loop 'failure' relationships back to the + * same processor. As such, emitting a Route event to indicate that a + * FlowFile was routed to 'failure' can result in creating thousands of + * Provenance Events for a given FlowFile, resulting in a very difficult-to- + * understand lineage. + * + * @param flowFile the FlowFile being routed + * @param relationship the Relationship to which the FlowFile was routed + * @param processingDuration the number of milliseconds that it took to + * determine how to route the FlowFile + */ + void route(FlowFile flowFile, Relationship relationship, long processingDuration); + + /** + * Emits a Provenance Event of type {@link ProvenanceEventType#ROUTE ROUTE} + * that indicates that the given FlowFile was routed to the given + * {@link Relationship}. <b>Note: </b> this Event is intended ONLY for + * Processors whose sole job it is to route FlowFiles and should NOT be used + * as a way to indicate that hte given FlowFile was routed to a standard + * 'success' or 'failure' relationship. Doing so can be problematic, as + * DataFlow Managers often will loop 'failure' relationships back to the + * same processor. As such, emitting a Route event to indicate that a + * FlowFile was routed to 'failure' can result in creating thousands of + * Provenance Events for a given FlowFile, resulting in a very difficult-to- + * understand lineage. + * + * @param flowFile the FlowFile being routed + * @param relationship the Relationship to which the FlowFile was routed + * @param details any details pertinent to the Route event, such as why the + * FlowFile was routed to the specified Relationship + * @param processingDuration the number of milliseconds that it took to + * determine how to route the FlowFile + */ + void route(FlowFile flowFile, Relationship relationship, String details, long processingDuration); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CREATE CREATE} that indicates that the given + * FlowFile was created by NiFi from data that was not received from an + * external entity. If the data was received from an external source, use + * the {@link #receive(FlowFile, String)} event instead + * + * @param flowFile the FlowFile that was created + */ + void create(FlowFile flowFile); + + /** + * Emits a Provenance Event of type + * {@link ProvenanceEventType#CREATE CREATE} that indicates that the given + * FlowFile was created by NiFi from data that was not received from an + * external entity. If the data was received from an external source, use + * the {@link #receive(FlowFile, String, String, long)} event instead + * + * @param flowFile the FlowFile that was created + * @param details any relevant details about the CREATE event + */ + void create(FlowFile flowFile, String details); +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java new file mode 100644 index 0000000..e754ff7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageResult.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.Date; +import java.util.List; + +/** + * + */ +public interface ComputeLineageResult { + + /** + * @return all nodes for the graph + */ + public List<LineageNode> getNodes(); + + /** + * @return all links for the graph + */ + public List<LineageEdge> getEdges(); + + /** + * @return the date at which this AsynchronousLineageResult will expire + */ + Date getExpiration(); + + /** + * @return If an error occurred while computing the lineage, this will return the + * serialized error; otherwise, returns <code>null</code> + */ + String getError(); + + /** + * @return an integer between 0 and 100 (inclusive) that indicates what + * percentage of completion the computation has reached + */ + int getPercentComplete(); + + /** + * @return Indicates whether or not the lineage has finished running + */ + boolean isFinished(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageSubmission.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageSubmission.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageSubmission.java new file mode 100644 index 0000000..a9df26c --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ComputeLineageSubmission.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.Collection; +import java.util.Date; + +public interface ComputeLineageSubmission { + + /** + * @return the {@link ComputeLineageResult} that contains the results. The + * results may be partial if a call to + * {@link ComputeLineageResult#isFinished()} returns <code>false</code> + */ + ComputeLineageResult getResult(); + + /** + * @return the date at which this lineage was submitted + */ + Date getSubmissionTime(); + + /** + * @return the generated identifier for this lineage result + */ + String getLineageIdentifier(); + + /** + * Cancels the lineage computation + */ + void cancel(); + + /** + * @return <code>true</code> if {@link #cancel()} has been called, + * <code>false</code> otherwise + */ + boolean isCanceled(); + + /** + * @return the type of Lineage Computation that was submitted + */ + LineageComputationType getLineageComputationType(); + + /** + * @return If the Lineage Computation Type of this submission is + * {@link LineageComputationType.EXPAND_CHILDREN} or + * {@link LineageComputationType.EXPAND_PARENTS}, indicates the ID event + * that is to be expanded; otherwise, returns <code>null</code> + */ + Long getExpandedEventId(); + + /** + * @return all FlowFile UUID's that are encapsulated in this lineage + * computation submission + */ + Collection<String> getLineageFlowFileUuids(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/Lineage.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/Lineage.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/Lineage.java new file mode 100644 index 0000000..ff5fee7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/Lineage.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.List; + +/** + * A Data Structure for representing a Directed Graph that depicts the lineage + * of a FlowFile and all events that occurred for the FlowFile + */ +public interface Lineage { + + /** + * @return all nodes for the graph + */ + public List<LineageNode> getNodes(); + + /** + * @return all links for the graph + */ + public List<LineageEdge> getEdges(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageComputationType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageComputationType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageComputationType.java new file mode 100644 index 0000000..be74324 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageComputationType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +/** + * + */ +public enum LineageComputationType { + + FLOWFILE_LINEAGE, + EXPAND_PARENTS, + EXPAND_CHILDREN; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageEdge.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageEdge.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageEdge.java new file mode 100644 index 0000000..dba56f3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageEdge.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +public interface LineageEdge { + + String getUuid(); + + LineageNode getSource(); + + LineageNode getDestination(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java new file mode 100644 index 0000000..56e865f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNode.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +public interface LineageNode { + + /** + * @return the identifier of the Clustered NiFi Node that generated the + * event + */ + @Deprecated + String getClusterNodeIdentifier(); + + /** + * @return the type of the LineageNode + */ + LineageNodeType getNodeType(); + + /** + * @return the UUID of the FlowFile for which this Node was created + */ + String getFlowFileUuid(); + + /** + * @return the UUID for this LineageNode + */ + String getIdentifier(); + + /** + * @return the timestamp that corresponds to this Node. The meaning of the + * timestamp may differ between implementations. For example, a + * {@link ProvenanceEventLineageNode}'s timestamp indicates the time at + * which the event occurred. However, for a Node that reperesents a + * FlowFile, for example, the timestamp may represent the time at which the + * FlowFile was created + */ + long getTimestamp(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNodeType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNodeType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNodeType.java new file mode 100644 index 0000000..67e0d61 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/LineageNodeType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +public enum LineageNodeType { + + FLOWFILE_NODE, + PROVENANCE_EVENT_NODE; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ProvenanceEventLineageNode.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ProvenanceEventLineageNode.java b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ProvenanceEventLineageNode.java new file mode 100644 index 0000000..f490496 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/lineage/ProvenanceEventLineageNode.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.List; + +import org.apache.nifi.provenance.ProvenanceEventType; + +public interface ProvenanceEventLineageNode extends LineageNode { + + ProvenanceEventType getEventType(); + + long getEventIdentifier(); + + List<String> getParentUuids(); + + List<String> getChildUuids(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/Query.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/Query.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/Query.java new file mode 100644 index 0000000..3519c14 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/Query.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Objects; + + +public class Query { + + private final String identifier; + private final List<SearchTerm> searchTerms = new ArrayList<>(); + private Date startDate; + private Date endDate; + private String minFileSize; + private String maxFileSize; + private int maxResults = 1000; + + public Query(final String identifier) { + this.identifier = Objects.requireNonNull(identifier); + } + + public String getIdentifier() { + return identifier; + } + + public void addSearchTerm(final SearchTerm searchTerm) { + searchTerms.add(searchTerm); + } + + public List<SearchTerm> getSearchTerms() { + return Collections.unmodifiableList(searchTerms); + } + + public Date getStartDate() { + return startDate; + } + + public void setStartDate(Date startDate) { + this.startDate = startDate; + } + + public Date getEndDate() { + return endDate; + } + + public void setEndDate(Date endDate) { + this.endDate = endDate; + } + + public int getMaxResults() { + return maxResults; + } + + public void setMaxResults(int maxResults) { + this.maxResults = maxResults; + } + + public void setMinFileSize(final String fileSize) { + this.minFileSize = fileSize; + } + + public String getMinFileSize() { + return minFileSize; + } + + public void setMaxFileSize(final String fileSize) { + this.maxFileSize = fileSize; + } + + public String getMaxFileSize() { + return maxFileSize; + } + + @Override + public String toString() { + return "Query[ " + searchTerms + " ]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java new file mode 100644 index 0000000..0079433 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/QueryResult.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +import java.util.Date; +import java.util.List; + +import org.apache.nifi.provenance.ProvenanceEventRecord; + +public interface QueryResult { + + /** + * @return the Provenance events that match the query (up to the limit + * specified in the query) + */ + List<ProvenanceEventRecord> getMatchingEvents(); + + /** + * @return the total number of Provenance Events that hit + */ + long getTotalHitCount(); + + /** + * @return the number of milliseconds the query took to run + */ + long getQueryTime(); + + /** + * @return the date at which this QueryResult will expire + */ + Date getExpiration(); + + /** + * @return If an error occurred while computing the lineage, this will return the + * serialized error; otherwise, returns <code>null</code> + */ + String getError(); + + /** + * @return an integer between 0 and 100 (inclusive) that indicates what + * percentage of completion the query has reached + */ + int getPercentComplete(); + + /** + * @return Indicates whether or not the query has finished running + */ + boolean isFinished(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/QuerySubmission.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/QuerySubmission.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/QuerySubmission.java new file mode 100644 index 0000000..4716d2d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/QuerySubmission.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +import java.util.Date; + +public interface QuerySubmission { + + /** + * @return the query that this submission pertains to + */ + Query getQuery(); + + /** + * @return the {@link QueryResult} for this query. Note that the result is + * only a partial result if the result of calling + * {@link QueryResult#isFinished()} is <code>false</code> + */ + QueryResult getResult(); + + /** + * @return the date at which this query was submitted + */ + Date getSubmissionTime(); + + /** + * @return the generated identifier for this query result + */ + String getQueryIdentifier(); + + /** + * Cancels the query + */ + void cancel(); + + /** + * @return <code>true</code> if {@link #cancel()} has been called, + * <code>false</code> otherwise + */ + boolean isCanceled(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerm.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerm.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerm.java new file mode 100644 index 0000000..573dbd7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerm.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +public interface SearchTerm { + + SearchableField getSearchableField(); + + String getValue(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerms.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerms.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerms.java new file mode 100644 index 0000000..aad4d7b --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchTerms.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +public class SearchTerms { + + public static SearchTerm newSearchTerm(final SearchableField field, final String value) { + return new SearchTerm() { + @Override + public SearchableField getSearchableField() { + return field; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String toString() { + return getValue(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableField.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableField.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableField.java new file mode 100644 index 0000000..85c6154 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableField.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +/** + * A SearchableField represents a field in a Provenance Event that can be + * searched + */ +public interface SearchableField { + + /** + * @return the identifier that is used to refer to this field + */ + String getIdentifier(); + + /** + * @return the name of the field that is used when searching the repository + */ + String getSearchableFieldName(); + + /** + * @return the "friendly" name or "display name" of the field, which may be + * more human-readable than the searchable field name + */ + String getFriendlyName(); + + /** + * @return the type of the data stored in this field + */ + SearchableFieldType getFieldType(); + + /** + * @return <code>true</code> if this field represents a FlowFile attribute, + * <code>false</code> if the field represents a Provenance Event detail, + * such as Source System URI + */ + boolean isAttribute(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableFieldType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableFieldType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableFieldType.java new file mode 100644 index 0000000..51ece58 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/search/SearchableFieldType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.search; + +/** + * + */ +public enum SearchableFieldType { + + STRING, + DATE, + DATA_SIZE, + LONG; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java new file mode 100644 index 0000000..c3a34b2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.util.concurrent.TimeUnit; + +/** + * A model object for referring to a remote destination (i.e., a Port) for + * site-to-site communications + */ +public interface RemoteDestination { + + /** + * @return the identifier of the remote destination + */ + String getIdentifier(); + + /** + * @return the human-readable name of the remote destination + */ + String getName(); + + /** + * @param timeUnit to yield + * @return the amount of time that system should pause sending to a + * particular node if unable to send data to or receive data from this + * endpoint + */ + long getYieldPeriod(TimeUnit timeUnit); + + /** + * @return whether or not compression should be used when transferring data + * to or receiving data from the remote endpoint + */ + boolean isUseCompression(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java new file mode 100644 index 0000000..b5afe17 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/AbstractReportingTask.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessorInitializationContext; + +public abstract class AbstractReportingTask extends AbstractConfigurableComponent implements ReportingTask { + + private String identifier; + private String name; + private long schedulingNanos; + private ControllerServiceLookup serviceLookup; + private ComponentLog logger; + + @Override + public final void initialize(final ReportingInitializationContext config) throws InitializationException { + identifier = config.getIdentifier(); + logger = config.getLogger(); + name = config.getName(); + schedulingNanos = config.getSchedulingPeriod(TimeUnit.NANOSECONDS); + serviceLookup = config.getControllerServiceLookup(); + + init(config); + } + + /** + * @return the {@link ControllerServiceLookup} that was passed to the + * {@link #init(ProcessorInitializationContext)} method + */ + protected final ControllerServiceLookup getControllerServiceLookup() { + return serviceLookup; + } + + /** + * @return the identifier of this Reporting Task + */ + @Override + public String getIdentifier() { + return identifier; + } + + /** + * @return the name of this Reporting Task + */ + protected String getName() { + return name; + } + + /** + * @param timeUnit of scheduling period + * @return the amount of times that elapses between the moment that this + * ReportingTask finishes its invocation of + * {@link #onTrigger(ReportingContext)} and the next time that + * {@link #onTrigger(ReportingContext)} is called. + */ + protected long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos, TimeUnit.NANOSECONDS); + } + + /** + * Provides a mechanism by which subclasses can perform initialization of + * the Reporting Task before it is scheduled to be run + * + * @param config context + * @throws InitializationException if failure to init + */ + protected void init(final ReportingInitializationContext config) throws InitializationException { + } + + /** + * @return the logger that has been provided to the component by the + * framework in its initialize method + */ + protected ComponentLog getLogger() { + return logger; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java new file mode 100644 index 0000000..fe370ae --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import java.util.Date; + +/** + * A Bulletin is a construct that represents a message that is to be displayed + * to users to notify of a specific (usually fleeting) event. + */ +public abstract class Bulletin implements Comparable<Bulletin> { + + private final Date timestamp; + private final long id; + private String nodeAddress; + private String level; + private String category; + private String message; + + private String groupId; + private String sourceId; + private String sourceName; + private ComponentType sourceType; + + protected Bulletin(final long id) { + this.timestamp = new Date(); + this.id = id; + } + + public String getNodeAddress() { + return nodeAddress; + } + + public void setNodeAddress(String nodeAddress) { + this.nodeAddress = nodeAddress; + } + + public String getCategory() { + return category; + } + + public void setCategory(String category) { + this.category = category; + } + + public long getId() { + return id; + } + + public String getLevel() { + return level; + } + + public void setLevel(String level) { + this.level = level; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Date getTimestamp() { + return timestamp; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public ComponentType getSourceType() { + return sourceType; + } + + public void setSourceType(ComponentType sourceType) { + this.sourceType = sourceType; + } + + @Override + public String toString() { + return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}'; + } + + @Override + public int compareTo(Bulletin b) { + if (b == null) { + return -1; + } + + return -Long.compare(getId(), b.getId()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java b/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java new file mode 100644 index 0000000..7ba2089 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import java.util.regex.Pattern; + +/** + * + */ +public class BulletinQuery { + + private final ComponentType sourceType; + private final Pattern sourceIdPattern; + private final Pattern groupIdPattern; + private final Pattern namePattern; + private final Pattern messagePattern; + private final Long after; + private final Integer limit; + + private BulletinQuery(final Builder builder) { + this.sourceType = builder.sourceType; + this.sourceIdPattern = builder.sourceIdPattern == null ? null : Pattern.compile(builder.sourceIdPattern); + this.groupIdPattern = builder.groupIdPattern == null ? null : Pattern.compile(builder.groupIdPattern); + this.namePattern = builder.namePattern == null ? null : Pattern.compile(builder.namePattern); + this.messagePattern = builder.messagePattern == null ? null : Pattern.compile(builder.messagePattern); + this.after = builder.after; + this.limit = builder.limit; + } + + public ComponentType getSourceType() { + return sourceType; + } + + public Pattern getSourceIdPattern() { + return sourceIdPattern; + } + + public Pattern getGroupIdPattern() { + return groupIdPattern; + } + + public Pattern getNamePattern() { + return namePattern; + } + + public Pattern getMessagePattern() { + return messagePattern; + } + + public Long getAfter() { + return after; + } + + public Integer getLimit() { + return limit; + } + + public static class Builder { + + private ComponentType sourceType; + private String sourceIdPattern; + private String groupIdPattern; + private String namePattern; + private String messagePattern; + private Long after; + private Integer limit; + + public Builder after(Long after) { + this.after = after; + return this; + } + + public Builder groupIdMatches(String groupId) { + this.groupIdPattern = groupId; + return this; + } + + public Builder sourceIdMatches(String sourceId) { + this.sourceIdPattern = sourceId; + return this; + } + + public Builder sourceType(ComponentType sourceType) { + this.sourceType = sourceType; + return this; + } + + public Builder nameMatches(String name) { + this.namePattern = name; + return this; + } + + public Builder messageMatches(String message) { + this.messagePattern = message; + return this; + } + + public Builder limit(Integer limit) { + this.limit = limit; + return this; + } + + public BulletinQuery build() { + return new BulletinQuery(this); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinRepository.java b/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinRepository.java new file mode 100644 index 0000000..2679099 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinRepository.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import java.util.List; + +/** + * The BulletinRepository provides a place to store and retrieve + * {@link Bulletin}s that have been created by the NiFi Framework and the + * Components that are running within the Framework. + */ +public interface BulletinRepository { + + /** + * Adds a Bulletin to the repository. + * + * @param bulletin to add + */ + void addBulletin(Bulletin bulletin); + + /** + * @return the capacity for the number of bulletins for the controller + */ + int getControllerBulletinCapacity(); + + /** + * @return the capacity for the number of bulletins per component + */ + int getComponentBulletinCapacity(); + + /** + * Finds Bulletin's that meet the specified query. + * + * @param bulletinQuery indicates which bulletins are of interest + * @return bulletins that met the query + */ + List<Bulletin> findBulletins(BulletinQuery bulletinQuery); + + /** + * Finds all bulletins for the specified group. + * + * @param groupId id of the group + * @return bulletins for the given group + */ + List<Bulletin> findBulletinsForGroupBySource(String groupId); + + /** + * Finds all bulletins for the specified group. + * + * @param groupId id of the group + * @param maxPerComponent max responses wanted + * @return bulletins found + */ + List<Bulletin> findBulletinsForGroupBySource(String groupId, int maxPerComponent); + + /** + * @return all bulletins for the controller + */ + List<Bulletin> findBulletinsForController(); + + /** + * Finds all bulletins for the controller; + * + * @param max limits the number of responses + * @return all bulletins for the controller + */ + List<Bulletin> findBulletinsForController(int max); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java new file mode 100644 index 0000000..97f3538 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +/** + * An Enumeration for indicating which type of component a Bulletin is associated with + */ +public enum ComponentType { + + /** + * Bulletin is associated with a Processor + */ + PROCESSOR, + + /** + * Bulletin is associated with a Remote Process Group + */ + REMOTE_PROCESS_GROUP, + + /** + * Bulletin is associated with an Input Port + */ + INPUT_PORT, + + /** + * Bulletin is associated with an Output Port + */ + OUTPUT_PORT, + + /** + * Bulletin is associated with a Reporting Task + */ + REPORTING_TASK, + + /** + * Bulletin is associated with a Controller Service + */ + CONTROLLER_SERVICE, + + /** + * Bulletin is a system-level bulletin, associated with the Flow Controller + */ + FLOW_CONTROLLER; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java new file mode 100644 index 0000000..bdc23c2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/EventAccess.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; + +import java.io.IOException; +import java.util.List; + +public interface EventAccess { + + /** + * @return the status for all components in this Controller + */ + ProcessGroupStatus getControllerStatus(); + + /** + * Convenience method to obtain Provenance Events starting with (and + * including) the given ID. If no event exists with that ID, the first event + * to be returned will be have an ID greater than <code>firstEventId</code>. + * + * @param firstEventId the ID of the first event to obtain + * @param maxRecords the maximum number of records to obtain + * @return event records matching query + * @throws java.io.IOException if unable to get records + */ + List<ProvenanceEventRecord> getProvenanceEvents(long firstEventId, final int maxRecords) throws IOException; + + /** + * @return the Provenance Event Repository + */ + ProvenanceEventRepository getProvenanceRepository(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/reporting/InitializationException.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/InitializationException.java b/nifi-api/src/main/java/org/apache/nifi/reporting/InitializationException.java new file mode 100644 index 0000000..c2f21b4 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/reporting/InitializationException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +public class InitializationException extends Exception { + + public InitializationException(String explanation) { + super(explanation); + } + + public InitializationException(Throwable cause) { + super(cause); + } + + public InitializationException(String explanation, Throwable cause) { + super(explanation, cause); + } +}