[GitHub] [nifi] markap14 commented on a change in pull request #4700: NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateles…
markap14 commented on a change in pull request #4700: URL: https://github.com/apache/nifi/pull/4700#discussion_r534466759 ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java ## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stateless.repository; + +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.AsyncLineageSubmission; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.util.RingBuffer; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +public class StatelessProvenanceRepository implements ProvenanceRepository { + +public static String CONTAINER_NAME = "in-memory"; + +private final RingBuffer ringBuffer; +private final int maxSize; + +private final AtomicLong idGenerator = new AtomicLong(0L); + +public StatelessProvenanceRepository(final int maxEvents) { +maxSize = maxEvents; +ringBuffer = new RingBuffer<>(maxSize); +} + +@Override +public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory, + final IdentifierLookup idLookup) throws IOException { + +} + +@Override +public ProvenanceEventRepository getProvenanceEventRepository() { +return this; +} + +@Override +public ProvenanceEventBuilder eventBuilder() { +return new StandardProvenanceEventRecord.Builder(); +} + +@Override +public void registerEvent(final ProvenanceEventRecord event) { +final long id = idGenerator.getAndIncrement(); +ringBuffer.add(new IdEnrichedProvEvent(event, id)); +} + +@Override +public void registerEvents(final Iterable events) { +for (final ProvenanceEventRecord event : events) { +registerEvent(event); +} +} + +@Override +public List getEvents(final long firstRecordId, final int maxRecords) throws IOException { +return getEvents(firstRecordId, maxRecords, null); +} + +@Override +public List getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException { +return ringBuffer.getSelectedElements(new RingBuffer.Filter() { +@Override +public boolean select(final ProvenanceEventRecord value) { +return value.getEventId() >= firstRecordId; +} +}, maxRecords); +} + +@Override +public Long getMaxEventId() { +final ProvenanceEventRecord newest = ringBuffer.getNewestElement(); +return (newest == null) ? null : newest.getEventId(); +} + +public ProvenanceEventRecord getEvent(final String identifier) throws IOException { +final List records = ringBuffer.getSelectedElements(new RingBuffer.Filter() { +@Override +public boolean select(final ProvenanceEventRecord event) { +return identifier.equals(event.getFlowFileUuid()); +} +}, 1); +return records.isEmpty() ? null : records.get(0); +} + +@Override +public ProvenanceEventRecord getEvent(final long
[GitHub] [nifi] markap14 commented on a change in pull request #4700: NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateles…
markap14 commented on a change in pull request #4700: URL: https://github.com/apache/nifi/pull/4700#discussion_r534455443 ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java ## @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stateless.repository; + +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.AsyncLineageSubmission; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.util.RingBuffer; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class VolatileProvenanceRepository implements ProvenanceRepository { + +// default property values +public static final int DEFAULT_BUFFER_SIZE = 1; + +public static String CONTAINER_NAME = "in-memory"; + +private final RingBuffer ringBuffer; +private final int maxSize; + +private final AtomicLong idGenerator = new AtomicLong(0L); +private final AtomicBoolean initialized = new AtomicBoolean(false); + +/** + * Default no args constructor for service loading only + */ +public VolatileProvenanceRepository() { Review comment: Given the usage pattern, that this will only be used in stateless, and it's not loaded via the service loader, I don't think we even need the no-arg constructor. ## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java ## @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stateless.repository; + +import org.apache.nifi.authorization.Authorizer; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.AsyncLineageSubmission; +import org.apache.nifi.provenance.IdentifierLookup; +import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import