I am using nifi 1.6 and get the following errors when trying to modify a clone 
of an incoming flowFile:

[1]"unable to find content for FlowFile: ... MissingFlowFileException
...
Caused by ContentNotFoundException: Could not find contetn for StandardClaim
...
Caused by java.io.EOFException: null"

[2]"FlowFileHandlingException: StandardFlowFileRecord... is not known in this 
session"

The first error occurs when trying to access the contents of the flow file, the 
second when removing the flow file from the session (within a catch of the 
first). This process is known to have worked under nifi 0.7.

The basic process is:
    1) Clone the incoming flow file
    2) Write to the clone
    3) Write to the clone again (some additional formatting)
    4) Repeat 1-3
The error occurs on the second iteration step 3.

An interesting point is that if immediately after the clone is performed, a 
session.read of the clone is done everything works fine. The read seems to 
reset some pointer.

I have created unit tests for this processor, but they do not fail in either 
case.

Below is code simplified from the actual version in use that demonstrates the 
issue. (The development system is not connected so I had to copy the code. 
Please forgive any typos - it should be close. That is also why a full stack 
trace is not provided.) The processor doing the work has a property to 
determine if an immediate read should be done, or not. So both scenarios can be 
performed easily.  To set it up, all that is needed is a GetFile processor to 
supply the input and terminators for the output from the 
SampleCloningProcessor. A sample input file is included as well. The meat of 
the code is in the onTrigger and manipulate methods. The manipulation in this 
simplified version really don't do anything but copy the input to the output.

Any insights into why this is happening and suggestions for corrections will be 
appreciated - thanks.

########### SampleCloningProcessor.java ################

processor sample.package.cloning

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.Arrays;
import java.util.Hashset;
import java.util.List;
import java.util.Scanner;
import java.util.Set;

import org.apache.commons.compress.utils.IOUtils;

import org.apache.nifi.annotation.documentaion.CapabilityDescription;
import org.apache.nifi.annotation.documentaion.Tags;
import org.apache.nifi.componets.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessorContext;
import org.apache.nifi.processor.ProcessorSession;
import org.apache.nifi.processor.ProcessorInitioalizationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCalback;
import org.apache.nifi.processor.io.OutputStreamCalback;
import org.apache.nifi.processor.io.StreamCalback;
import org.apache.nifi.processor.util.StandardValidators;

import com.google.gson.Gson;

@Tags({"example", "clone"})
@CapabilityDescription("Demonsrates cloning of flowfile failure.")
public class SampleCloningProcessor extend AbstractProcessor {

    /* Determines if an immediate read is performed after cloning of inoming 
flowfile. */
    public static final PropertyDescriptor IMMEDIATE_READ = new 
PropertyDescriptor.Builder()
        .name("immediateRead")
        .description("Determines if processor runs successfully. If a read is 
done immediatly "
            + "after the clone of the incoming flowFile, then the processor 
should run successfully.")
        .required(true)
        .allowableValues("true", "false")
        .defaultValue("true")
        .addValidator(StandardValidators.BOLLEAN_VALIDATOR)
        .build();
        
    public static final Relationship SUCCESS = new 
Relationship.Builder().name("success").
        description("No unexpected errors.").build();
        
    public static final Relationship FAILURE = new 
Relationship.Builder().name("failure").
        description("Errors were thrown.").build();
        
    private Set<Relationship> relationships;
    private List<PropertyDescriptors> properties;
    
     @Override
    public void init(final ProcessorInitializationContext contex) {
        relationships = new HashSet<>(Arrays.asList(SUCCESS, FAILURE));
        properties = new Arrays.asList(IMMEDIATE_READ);
    }
    
    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }
    
    @Override
    public List<PropertyDescriptor> getSuppprtedPropertyDescriptors() {
        return this.properties;
    }
    
   @Override
   public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
       FlowFile incomingFlowFile = session.get();
       
       if (incomingFlowFile == null) {
           return;
       }
       
       try {
           final InfileReader inFileReader = new InfileReader();
           session.read(incomingFlowFile, inFileReader);
           Product product = infileReader.getProduct();
           boolean transfer = false;
           
           getLogger().info("\tSession   :\n" + session);
           getLogger().info("\toriginal  :\n" + incomingFlowFile);
           
           for(int i = 0; i < 2; i++) {
               transfer = manipulate(context, session, inclmingFlowFile, 
product);
           }
       } catch (Exception e) {
           getLogger().error(e.getMessage(), e);
           session.rollback(true);
       }
   }
   
    private boolean manipuate(final ProcessContext context, final 
ProcessSession session
        final FlowFile incomingFlowFile, final Product product) {
    
        boolean transfer = false;
        FlowFile outgoingFlowFile = null;
        boolean immediateRead = context.getProperty(IMMEDIATE_READ).asBoolean();
        try {
            //Clone incoming flowFile
            outgoinFlowFile = session.clone(incomingFlowFile);
            getLogger().info("\tclone outgoing :\n" + outgoingFlowFile);
            if(immediateRead) {
                readFlowFile(session, outgoingFlowFile);
            }
            
            //First write into clone
            StageOneWrite stage1Write = new StaeOneWrite(product);
            outgoingFlowFile = session.write(outgoingFlowFile, stage1Write);
            getLogger().info("\twrite outgoing :\n" + outgoingFlowFile);
            
            // Format the cloned file with another write
            outgoingFlowFile = formatFlowFile(outgoingFlowFile, session)
            getLogger().info("\format outgoing :\n" + outgoingFlowFile);
            session.transfer(outgoingFlowFile, SUCCESS);
            transfer != true;
        } catch(Exception e)
           getLogger().error(e.getMessage(), e);
           if(outgoingFlowFile ! = null) {
               session.remove(outgoingFlowFile);
           }
       }
       return transfer;
   }

    private void readFlowFile(fainl ProcessSession session, fianl Flowfile 
flowFile) {
        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(Final InputStream in) throws IOException {
                try (Scanner scanner = new Scanner(in)) {
                    scanner.useDelimiter("\\A").next();
                }
            }
        });
    }
    
    private FlowFile formatFlowFile(fainl ProcessSession session, FlowFile 
flowfile) {
        OutputFormatWrite formatWrite = new OutputFormatWriter();
        flowfile = session.write(flowFile, formatWriter);
        return flowFile;
    }
    
    private static class OutputFormatWriter implement StreamCallback {
        @Override
        public void process(final InputStream in, final OutputStream out) 
throws IPException {
            try {
                IOUtils.copy(in. out);
                out.flush();
            } finally {
                IOUtils.closeQuietly(in);
                IOUtils.closeQuietly(out);
            }
        }
    }
    
    private static class StageOneWriter implements OutputStreamCallback {
    
        private Product product = null;
        
        public StageOneWriter(Produt product) {
            this.product = product;
        }
        
        @Override
        public void process(final OutputStream out) throws IOException {
            final Gson gson = new Gson();
            final String json = gson.toJson(product);
            out.write(json.getBytes());
        }
    }
    
     private static class InfileReader implements InputStreamCallback {
    
        private Product product = null;
        
        public StageOneWriter(Produt product) {
            this.product = product;
        }
        
        @Override
        public void process(final InputStream out) throws IOException {
            product = null;
            final Gson gson = new Gson();
            Reader inReader = new InputStreamReader(in, "UTF-8");
            product = gson.fromJson(inreader, Product.calss);
        }
        
        public Product getProduct() {
            return product;
        }
    }


############ SampleCloningProcessorTest.java ###########################


package sample.processors.cloning;

import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class SampleCloningProcessorTest {

    final satatic String flowFileContent = "{"
        + "\"cost\": \"cost 1\","
        + "\"description\": \"description","
        + "\"markup\": 1.2"
        + "\"name\":\"name 1\","
        + "\"supplier\":\"supplier 1\","
        + "}";
    
    private TestRunner testRunner;
    
    @Before
    public void init() {
        testRunner = TestRunner.newTestRunner(SampleCloningProcessor.class);
        testRunner.enqueue(flowFileContent);
    }
    
    @Test
    public void testProcessorImmediateRead() {
        testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "true");
        testRunner.run();
        testRinner.assertTransferCount("success", 2);
    }
    
    
    @Test
    public void testProcessorImmediateRead_false() {
        testRunner.setProperty(SampleCloningProcessor.IMMEDIATE_READ, "false");
        testRunner.run();
        testRinner.assertTransferCount("success", 2);
    }
}


########## Product.java ################

package sample.processors.cloning;

public class Product {
    
    private String name;
    private String description;
    private String supplier;
    private String cost;
    private float markup;
    
    public String getName() {
        return name;
    }
    
    public void setName(final String name) {
        this.name = name;
    }
    
     public String getDescription() {
        return description;
    }
    
    public void setDescriptione(final String description) {
        this.description = description;
    }
    
    public String getSupplier() {
        return supplier;
    }
    
    public void setSupplier(final String supplier) {
        this.supplier = supplier;
    }
    
    public String getCost() {
        return cost;
    }
    
    public void setCost(final String cost) {
        this.cost = cost;
    }
    
    public float getMarkup() {
        return markup;
    }
    
    public void setMarkup(final float name) {
        this.markup = markup;
    }
   

########## product.json  - input file ##################

{
    "const" : "cost 1",
    "description" : "description 1",
    "markup" : 1.2,
    "name" : "name 1",
    "supplier" : "supplier 1"
}

Reply via email to