Robert, Thanks for reporting this - and for all the details! The good news is that it is easy to replicate. The better news is that it was pretty easy to fix :) I have created NIFI-5879 [1] to track this issue, and there is a Pull Request up for it now.
Thanks! -Mark [1] https://issues.apache.org/jira/browse/NIFI-5879 > On Dec 6, 2018, at 10:04 AM, Robert Creese <[email protected]> wrote: > > > > I am using nifi 1.6 and get the following errors when trying to modify a > clone of an incoming flowFile: > > [1]"unable to find content for FlowFile: ... MissingFlowFileException > ... > Caused by ContentNotFoundException: Could not find contetn for StandardClaim > ... > Caused by java.io.EOFException: null" > > [2]"FlowFileHandlingException: StandardFlowFileRecord... is not known in this > session" > > The first error occurs when trying to access the contents of the flow file, > the second when removing the flow file from the session (within a catch of > the first). This process is known to have worked under nifi 0.7. > > The basic process is: > 1) Clone the incoming flow file > 2) Write to the clone > 3) Write to the clone again (some additional formatting) > 4) Repeat 1-3 > The error occurs on the second iteration step 3. > > An interesting point is that if immediately after the clone is performed, a > session.read of the clone is done everything works fine. The read seems to > reset some pointer. > > I have created unit tests for this processor, but they do not fail in either > case. > > Below is code simplified from the actual version in use that demonstrates the > issue. (The development system is not connected so I had to copy the code. > Please forgive any typos - it should be close. That is also why a full stack > trace is not provided.) The processor doing the work has a property to > determine if an immediate read should be done, or not. So both scenarios can > be performed easily. To set it up, all that is needed is a GetFile processor > to supply the input and terminators for the output from the > SampleCloningProcessor. A sample input file is included as well. The meat of > the code is in the onTrigger and manipulate methods. The manipulation in this > simplified version really don't do anything but copy the input to the output. > > Any insights into why this is happening and suggestions for corrections will > be appreciated - thanks. > > ########### SampleCloningProcessor.java ################ > > processor sample.package.cloning > > import java.io.IOException; > import java.io.InputStream; > import java.io.InputStreamReader; > import java.io.OutputStream; > import java.io.Reader; > import java.util.Arrays; > import java.util.Hashset; > import java.util.List; > import java.util.Scanner; > import java.util.Set; > > import org.apache.commons.compress.utils.IOUtils; > > import org.apache.nifi.annotation.documentaion.CapabilityDescription; > import org.apache.nifi.annotation.documentaion.Tags; > import org.apache.nifi.componets.PropertyDescriptor; > import org.apache.nifi.flowfile.FlowFile; > import org.apache.nifi.processor.AbstractProcessor; > import org.apache.nifi.processor.ProcessorContext; > import org.apache.nifi.processor.ProcessorSession; > import org.apache.nifi.processor.ProcessorInitioalizationContext; > import org.apache.nifi.processor.Relationship; > import org.apache.nifi.processor.exception.ProcessException; > import org.apache.nifi.processor.io.InputStreamCalback; > import org.apache.nifi.processor.io.OutputStreamCalback; > import org.apache.nifi.processor.io.StreamCalback; > import org.apache.nifi.processor.util.StandardValidators; > > import com.google.gson.Gson; > > @Tags({"example", "clone"}) > @CapabilityDescription("Demonsrates cloning of flowfile failure.") > public class SampleCloningProcessor extend AbstractProcessor { > > /* Determines if an immediate read is performed after cloning of inoming > flowfile. */ > public static final PropertyDescriptor IMMEDIATE_READ = new > PropertyDescriptor.Builder() > .name("immediateRead") > .description("Determines if processor runs successfully. If a read is > done immediatly " > + "after the clone of the incoming flowFile, then the processor > should run successfully.") > .required(true) > .allowableValues("true", "false") > .defaultValue("true") > .addValidator(StandardValidators.BOLLEAN_VALIDATOR) > .build(); > > public static final Relationship SUCCESS = new > Relationship.Builder().name("success"). > description("No unexpected errors.").build(); > > public static final Relationship FAILURE = new > Relationship.Builder().name("failure"). > description("Errors were thrown.").build(); > > private Set<Relationship> relationships; > private List<PropertyDescriptors> properties; > > @Override > public void init(final ProcessorInitializationContext contex) { > relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE)); > properties = new Arrays.asList(IMMEDIATE_READ); > } > > @Override > public Set<Relationship> getRelationships() { > return this.relationships; > } > > @Override > public List<PropertyDescriptor> getSuppprtedPropertyDescriptors() { > return this.properties; > } > > @Override > public void onTrigger(final ProcessContext context, final ProcessSession > session) throws ProcessException { > FlowFile incomingFlowFile = session.get(); > > if (incomingFlowFile == null) { > return; > } > > try { > final InfileReader inFileReader = new InfileReader(); > session.read(incomingFlowFile, inFileReader); > Product product = infileReader.getProduct(); > boolean transfer = false; > > getLogger().info("\tSession :\n" + session); > getLogger().info("\toriginal :\n" + incomingFlowFile); > > for(int i = 0; i < 2; i++) { > transfer = manipulate(context, session, inclmingFlowFile, > product); > } > } catch (Exception e) { > getLogger().error(e.getMessage(), e); > session.rollback(true); > } > } > > private boolean manipuate(final ProcessContext context, final > ProcessSession session > final FlowFile incomingFlowFile, final Product product) { > > boolean transfer = false; > FlowFile outgoingFlowFile = null; > boolean immediateRead = > context.getProperty(IMMEDIATE_READ).asBoolean(); > try { > //Clone incoming flowFile > outgoinFlowFile = session.clone(incomingFlowFile); > getLogger().info("\tclone outgoing :\n" + outgoingFlowFile); > if(immediateRead) { > readFlowFile(session, outgoingFlowFile); > } > > //First write into clone > StageOneWrite stage1Write = new StaeOneWrite(product); > outgoingFlowFile = session.write(outgoingFlowFile, stage1Write); > getLogger().info("\twrite outgoing :\n" + outgoingFlowFile); > > // Format the cloned file with another write > outgoingFlowFile = formatFlowFile(outgoingFlowFile, session) > getLogger().info("\format outgoing :\n" + outgoingFlowFile); > session.transfer(outgoingFlowFile, SUCCESS); > transfer != true; > } catch(Exception e) > getLogger().error(e.getMessage(), e); > if(outgoingFlowFile ! = null) { > session.remove(outgoingFlowFile); > } > } > return transfer; > } > > private void readFlowFile(fainl ProcessSession session, fianl Flowfile > flowFile) { > session.read(flowFile, new InputStreamCallback() { > @Override > public void process(Final InputStream in) throws IOException { > try (Scanner scanner = new Scanner(in)) { > scanner.useDelimiter("\\A").next(); > } > } > }); > } > > private FlowFile formatFlowFile(fainl ProcessSession session, FlowFile > flowfile) { > OutputFormatWrite formatWrite = new OutputFormatWriter(); > flowfile = session.write(flowFile, formatWriter); > return flowFile; > } > > private static class OutputFormatWriter implement StreamCallback { > @Override > public void process(final InputStream in, final OutputStream out) > throws IPException { > try { > IOUtils.copy(in. out); > out.flush(); > } finally { > IOUtils.closeQuietly(in); > IOUtils.closeQuietly(out); > } > } > } > > private static class StageOneWriter implements OutputStreamCallback { > > private Product product = null; > > public StageOneWriter(Produt product) { > this.product = product; > } > > @Override > public void process(final OutputStream out) throws IOException { > final Gson gson = new Gson(); > final String json = gson.toJson(product); > out.write(json.getBytes()); > } > } > > private static class InfileReader implements InputStreamCallback { > > private Product product = null; > > public StageOneWriter(Produt product) { > this.product = product; > } > > @Override > public void process(final InputStream out) throws IOException { > product = null; > final Gson gson = new Gson(); > Reader inReader = new InputStreamReader(in, "UTF-8"); > product = gson.fromJson(inreader, Product.calss); > } > > public Product getProduct() { > return product; > } > } > > > ############ SampleCloningProcessorTest.java ########################### > > > package sample.processors.cloning; > > import org.apache.nifi.util.TestRunner; > import org.apache.nifi.util.TestRunners; > import org.junit.Before; > import org.junit.Test; > > public class SampleCloningProcessorTest { > > final satatic String flowFileContent = "{" > + "\"cost\": \"cost 1\"," > + "\"description\": \"description"," > + "\"markup\": 1.2" > + "\"name\":\"name 1\"," > + "\"supplier\":\"supplier 1\"," > + "}"; > > private TestRunner testRunner; > > @Before > public void init() { > testRunner = TestRunner.newTestRunner(SampleCloningProcessor.class); > testRunner.enqueue(flowFileContent); > } > > @Test > public void testProcessorImmediateRead() { > testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "true"); > testRunner.run(); > testRinner.assertTransferCount("success", 2); > } > > > @Test > public void testProcessorImmediateRead_false() { > testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "false"); > testRunner.run(); > testRinner.assertTransferCount("success", 2); > } > } > > > ########## Product.java ################ > > package sample.processors.cloning; > > public class Product { > > private String name; > private String description; > private String supplier; > private String cost; > private float markup; > > public String getName() { > return name; > } > > public void setName(final String name) { > this.name = name; > } > > public String getDescription() { > return description; > } > > public void setDescriptione(final String description) { > this.description = description; > } > > public String getSupplier() { > return supplier; > } > > public void setSupplier(final String supplier) { > this.supplier = supplier; > } > > public String getCost() { > return cost; > } > > public void setCost(final String cost) { > this.cost = cost; > } > > public float getMarkup() { > return markup; > } > > public void setMarkup(final float name) { > this.markup = markup; > } > > > ########## product.json - input file ################## > > { > "const" : "cost 1", > "description" : "description 1", > "markup" : 1.2, > "name" : "name 1", > "supplier" : "supplier 1" > }
