Robert,

Thanks for reporting this - and for all the details! The good news is that it 
is easy to replicate.
The better news is that it was pretty easy to fix :) I have created NIFI-5879 
[1] to track this issue,
and there is a Pull Request up for it now.

Thanks!
-Mark



[1] https://issues.apache.org/jira/browse/NIFI-5879


> On Dec 6, 2018, at 10:04 AM, Robert Creese <[email protected]> wrote:
> 
> 
> 
> I am using nifi 1.6 and get the following errors when trying to modify a 
> clone of an incoming flowFile:
> 
> [1]"unable to find content for FlowFile: ... MissingFlowFileException
> ...
> Caused by ContentNotFoundException: Could not find contetn for StandardClaim
> ...
> Caused by java.io.EOFException: null"
> 
> [2]"FlowFileHandlingException: StandardFlowFileRecord... is not known in this 
> session"
> 
> The first error occurs when trying to access the contents of the flow file, 
> the second when removing the flow file from the session (within a catch of 
> the first). This process is known to have worked under nifi 0.7.
> 
> The basic process is:
>    1) Clone the incoming flow file
>    2) Write to the clone
>    3) Write to the clone again (some additional formatting)
>    4) Repeat 1-3
> The error occurs on the second iteration step 3.
> 
> An interesting point is that if immediately after the clone is performed, a 
> session.read of the clone is done everything works fine. The read seems to 
> reset some pointer.
> 
> I have created unit tests for this processor, but they do not fail in either 
> case.
> 
> Below is code simplified from the actual version in use that demonstrates the 
> issue. (The development system is not connected so I had to copy the code. 
> Please forgive any typos - it should be close. That is also why a full stack 
> trace is not provided.) The processor doing the work has a property to 
> determine if an immediate read should be done, or not. So both scenarios can 
> be performed easily.  To set it up, all that is needed is a GetFile processor 
> to supply the input and terminators for the output from the 
> SampleCloningProcessor. A sample input file is included as well. The meat of 
> the code is in the onTrigger and manipulate methods. The manipulation in this 
> simplified version really don't do anything but copy the input to the output.
> 
> Any insights into why this is happening and suggestions for corrections will 
> be appreciated - thanks.
> 
> ########### SampleCloningProcessor.java ################
> 
> processor sample.package.cloning
> 
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.io.Reader;
> import java.util.Arrays;
> import java.util.Hashset;
> import java.util.List;
> import java.util.Scanner;
> import java.util.Set;
> 
> import org.apache.commons.compress.utils.IOUtils;
> 
> import org.apache.nifi.annotation.documentaion.CapabilityDescription;
> import org.apache.nifi.annotation.documentaion.Tags;
> import org.apache.nifi.componets.PropertyDescriptor;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessorContext;
> import org.apache.nifi.processor.ProcessorSession;
> import org.apache.nifi.processor.ProcessorInitioalizationContext;
> import org.apache.nifi.processor.Relationship;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.processor.io.InputStreamCalback;
> import org.apache.nifi.processor.io.OutputStreamCalback;
> import org.apache.nifi.processor.io.StreamCalback;
> import org.apache.nifi.processor.util.StandardValidators;
> 
> import com.google.gson.Gson;
> 
> @Tags({"example", "clone"})
> @CapabilityDescription("Demonsrates cloning of flowfile failure.")
> public class SampleCloningProcessor extend AbstractProcessor {
> 
>    /* Determines if an immediate read is performed after cloning of inoming 
> flowfile. */
>    public static final PropertyDescriptor IMMEDIATE_READ = new 
> PropertyDescriptor.Builder()
>        .name("immediateRead")
>        .description("Determines if processor runs successfully. If a read is 
> done immediatly "
>            + "after the clone of the incoming flowFile, then the processor 
> should run successfully.")
>        .required(true)
>        .allowableValues("true", "false")
>        .defaultValue("true")
>        .addValidator(StandardValidators.BOLLEAN_VALIDATOR)
>        .build();
> 
>    public static final Relationship SUCCESS = new 
> Relationship.Builder().name("success").
>        description("No unexpected errors.").build();
> 
>    public static final Relationship FAILURE = new 
> Relationship.Builder().name("failure").
>        description("Errors were thrown.").build();
> 
>    private Set<Relationship> relationships;
>    private List<PropertyDescriptors> properties;
> 
>     @Override
>    public void init(final ProcessorInitializationContext contex) {
>        relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
>        properties = new Arrays.asList(IMMEDIATE_READ);
>    }
> 
>    @Override
>    public Set<Relationship> getRelationships() {
>        return this.relationships;
>    }
> 
>    @Override
>    public List<PropertyDescriptor> getSuppprtedPropertyDescriptors() {
>        return this.properties;
>    }
> 
>   @Override
>   public void onTrigger(final ProcessContext context, final ProcessSession 
> session) throws ProcessException {
>       FlowFile incomingFlowFile = session.get();
> 
>       if (incomingFlowFile == null) {
>           return;
>       }
> 
>       try {
>           final InfileReader inFileReader = new InfileReader();
>           session.read(incomingFlowFile, inFileReader);
>           Product product = infileReader.getProduct();
>           boolean transfer = false;
> 
>           getLogger().info("\tSession   :\n" + session);
>           getLogger().info("\toriginal  :\n" + incomingFlowFile);
> 
>           for(int i = 0; i < 2; i++) {
>               transfer = manipulate(context, session, inclmingFlowFile, 
> product);
>           }
>       } catch (Exception e) {
>           getLogger().error(e.getMessage(), e);
>           session.rollback(true);
>       }
>   }
> 
>    private boolean manipuate(final ProcessContext context, final 
> ProcessSession session
>        final FlowFile incomingFlowFile, final Product product) {
> 
>        boolean transfer = false;
>        FlowFile outgoingFlowFile = null;
>        boolean immediateRead = 
> context.getProperty(IMMEDIATE_READ).asBoolean();
>        try {
>            //Clone incoming flowFile
>            outgoinFlowFile = session.clone(incomingFlowFile);
>            getLogger().info("\tclone outgoing :\n" + outgoingFlowFile);
>            if(immediateRead) {
>                readFlowFile(session, outgoingFlowFile);
>            }
> 
>            //First write into clone
>            StageOneWrite stage1Write = new StaeOneWrite(product);
>            outgoingFlowFile = session.write(outgoingFlowFile, stage1Write);
>            getLogger().info("\twrite outgoing :\n" + outgoingFlowFile);
> 
>            // Format the cloned file with another write
>            outgoingFlowFile = formatFlowFile(outgoingFlowFile, session)
>            getLogger().info("\format outgoing :\n" + outgoingFlowFile);
>            session.transfer(outgoingFlowFile, SUCCESS);
>            transfer != true;
>        } catch(Exception e)
>           getLogger().error(e.getMessage(), e);
>           if(outgoingFlowFile ! = null) {
>               session.remove(outgoingFlowFile);
>           }
>       }
>       return transfer;
>   }
> 
>    private void readFlowFile(fainl ProcessSession session, fianl Flowfile 
> flowFile) {
>        session.read(flowFile, new InputStreamCallback() {
>            @Override
>            public void process(Final InputStream in) throws IOException {
>                try (Scanner scanner = new Scanner(in)) {
>                    scanner.useDelimiter("\\A").next();
>                }
>            }
>        });
>    }
> 
>    private FlowFile formatFlowFile(fainl ProcessSession session, FlowFile 
> flowfile) {
>        OutputFormatWrite formatWrite = new OutputFormatWriter();
>        flowfile = session.write(flowFile, formatWriter);
>        return flowFile;
>    }
> 
>    private static class OutputFormatWriter implement StreamCallback {
>        @Override
>        public void process(final InputStream in, final OutputStream out) 
> throws IPException {
>            try {
>                IOUtils.copy(in. out);
>                out.flush();
>            } finally {
>                IOUtils.closeQuietly(in);
>                IOUtils.closeQuietly(out);
>            }
>        }
>    }
> 
>    private static class StageOneWriter implements OutputStreamCallback {
> 
>        private Product product = null;
> 
>        public StageOneWriter(Produt product) {
>            this.product = product;
>        }
> 
>        @Override
>        public void process(final OutputStream out) throws IOException {
>            final Gson gson = new Gson();
>            final String json = gson.toJson(product);
>            out.write(json.getBytes());
>        }
>    }
> 
>     private static class InfileReader implements InputStreamCallback {
> 
>        private Product product = null;
> 
>        public StageOneWriter(Produt product) {
>            this.product = product;
>        }
> 
>        @Override
>        public void process(final InputStream out) throws IOException {
>            product = null;
>            final Gson gson = new Gson();
>            Reader inReader = new InputStreamReader(in, "UTF-8");
>            product = gson.fromJson(inreader, Product.calss);
>        }
> 
>        public Product getProduct() {
>            return product;
>        }
>    }
> 
> 
> ############ SampleCloningProcessorTest.java ###########################
> 
> 
> package sample.processors.cloning;
> 
> import org.apache.nifi.util.TestRunner;
> import org.apache.nifi.util.TestRunners;
> import org.junit.Before;
> import org.junit.Test;
> 
> public class SampleCloningProcessorTest {
> 
>    final satatic String flowFileContent = "{"
>        + "\"cost\": \"cost 1\","
>        + "\"description\": \"description","
>        + "\"markup\": 1.2"
>        + "\"name\":\"name 1\","
>        + "\"supplier\":\"supplier 1\","
>        + "}";
> 
>    private TestRunner testRunner;
> 
>    @Before
>    public void init() {
>        testRunner = TestRunner.newTestRunner(SampleCloningProcessor.class);
>        testRunner.enqueue(flowFileContent);
>    }
> 
>    @Test
>    public void testProcessorImmediateRead() {
>        testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "true");
>        testRunner.run();
>        testRinner.assertTransferCount("success", 2);
>    }
> 
> 
>    @Test
>    public void testProcessorImmediateRead_false() {
>        testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "false");
>        testRunner.run();
>        testRinner.assertTransferCount("success", 2);
>    }
> }
> 
> 
> ########## Product.java ################
> 
> package sample.processors.cloning;
> 
> public class Product {
> 
>    private String name;
>    private String description;
>    private String supplier;
>    private String cost;
>    private float markup;
> 
>    public String getName() {
>        return name;
>    }
> 
>    public void setName(final String name) {
>        this.name = name;
>    }
> 
>     public String getDescription() {
>        return description;
>    }
> 
>    public void setDescriptione(final String description) {
>        this.description = description;
>    }
> 
>    public String getSupplier() {
>        return supplier;
>    }
> 
>    public void setSupplier(final String supplier) {
>        this.supplier = supplier;
>    }
> 
>    public String getCost() {
>        return cost;
>    }
> 
>    public void setCost(final String cost) {
>        this.cost = cost;
>    }
> 
>    public float getMarkup() {
>        return markup;
>    }
> 
>    public void setMarkup(final float name) {
>        this.markup = markup;
>    }
> 
> 
> ########## product.json  - input file ##################
> 
> {
>    "const" : "cost 1",
>    "description" : "description 1",
>    "markup" : 1.2,
>    "name" : "name 1",
>    "supplier" : "supplier 1"
> }

Reply via email to