[GitHub] [nifi] markap14 commented on a change in pull request #4700: NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateles…

2020-12-02 Thread GitBox


markap14 commented on a change in pull request #4700:
URL: https://github.com/apache/nifi/pull/4700#discussion_r534466759



##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.RingBuffer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StatelessProvenanceRepository implements ProvenanceRepository {
+
+public static String CONTAINER_NAME = "in-memory";
+
+private final RingBuffer ringBuffer;
+private final int maxSize;
+
+private final AtomicLong idGenerator = new AtomicLong(0L);
+
+public StatelessProvenanceRepository(final int maxEvents) {
+maxSize = maxEvents;
+ringBuffer = new RingBuffer<>(maxSize);
+}
+
+@Override
+public void initialize(final EventReporter eventReporter, final Authorizer 
authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+   final IdentifierLookup idLookup) throws IOException 
{
+
+}
+
+@Override
+public ProvenanceEventRepository getProvenanceEventRepository() {
+return this;
+}
+
+@Override
+public ProvenanceEventBuilder eventBuilder() {
+return new StandardProvenanceEventRecord.Builder();
+}
+
+@Override
+public void registerEvent(final ProvenanceEventRecord event) {
+final long id = idGenerator.getAndIncrement();
+ringBuffer.add(new IdEnrichedProvEvent(event, id));
+}
+
+@Override
+public void registerEvents(final Iterable events) {
+for (final ProvenanceEventRecord event : events) {
+registerEvent(event);
+}
+}
+
+@Override
+public List getEvents(final long firstRecordId, 
final int maxRecords) throws IOException {
+return getEvents(firstRecordId, maxRecords, null);
+}
+
+@Override
+public List getEvents(final long firstRecordId, 
final int maxRecords, final NiFiUser user) throws IOException {
+return ringBuffer.getSelectedElements(new 
RingBuffer.Filter() {
+@Override
+public boolean select(final ProvenanceEventRecord value) {
+return value.getEventId() >= firstRecordId;
+}
+}, maxRecords);
+}
+
+@Override
+public Long getMaxEventId() {
+final ProvenanceEventRecord newest = ringBuffer.getNewestElement();
+return (newest == null) ? null : newest.getEventId();
+}
+
+public ProvenanceEventRecord getEvent(final String identifier) throws 
IOException {
+final List records = 
ringBuffer.getSelectedElements(new RingBuffer.Filter() {
+@Override
+public boolean select(final ProvenanceEventRecord event) {
+return identifier.equals(event.getFlowFileUuid());
+}
+}, 1);
+return records.isEmpty() ? null : records.get(0);
+}
+
+@Override
+public ProvenanceEventRecord getEvent(final long 

[GitHub] [nifi] markap14 commented on a change in pull request #4700: NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateles…

2020-12-02 Thread GitBox


markap14 commented on a change in pull request #4700:
URL: https://github.com/apache/nifi/pull/4700#discussion_r534455443



##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
##
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.RingBuffer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VolatileProvenanceRepository implements ProvenanceRepository {
+
+// default property values
+public static final int DEFAULT_BUFFER_SIZE = 1;
+
+public static String CONTAINER_NAME = "in-memory";
+
+private final RingBuffer ringBuffer;
+private final int maxSize;
+
+private final AtomicLong idGenerator = new AtomicLong(0L);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+/**
+ * Default no args constructor for service loading only
+ */
+public VolatileProvenanceRepository() {

Review comment:
   Given the usage pattern, that this will only be used in stateless, and 
it's not loaded via the service loader, I don't think we even need the no-arg 
constructor.

##
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
##
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import