This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 824cc0ed77b93d69a33358b1e8323102f30837ff
Author: Matthew Burgess <mattyb...@apache.org>
AuthorDate: Tue Jan 14 09:56:43 2020 -0500

    NIFI-7014: This closes #3985. Add RecordReader/Writer access in 
ExecuteGroovyScript
    
    Signed-off-by: Joe Witt <joew...@apache.org>
---
 .../nifi-groovyx-processors/pom.xml                | 17 ++++++
 .../processors/groovyx/ExecuteGroovyScript.java    | 65 ++++++++++++++++------
 .../additionalDetails.html                         | 12 ++++
 .../groovyx/ExecuteGroovyScriptTest.java           | 44 +++++++++++++++
 .../groovy/test_record_reader_writer.groovy        | 45 +++++++++++++++
 5 files changed, 166 insertions(+), 17 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
index 94589bd..3adefbc 100644
--- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml
@@ -37,6 +37,11 @@
             <version>1.11.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-json</artifactId>
             <version>${nifi.groovy.version}</version>
@@ -66,6 +71,18 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>
             <version>10.12.1.1</version>
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
index f67df2b..5516aeb 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java
@@ -46,6 +46,8 @@ import 
org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap;
 import org.apache.nifi.processors.groovyx.sql.OSql;
 import org.apache.nifi.processors.groovyx.util.Files;
 import org.apache.nifi.processors.groovyx.util.Validators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.codehaus.groovy.control.CompilerConfiguration;
 import org.codehaus.groovy.runtime.ResourceGroovyMethods;
 import org.codehaus.groovy.runtime.StackTraceUtils;
@@ -80,8 +82,9 @@ import java.util.Set;
 @DynamicProperty(name = "A script engine property to update",
         value = "The value to set it to",
         expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
-        description = "Updates a script engine property specified by the 
Dynamic Property's key with the value "
-                + "specified by the Dynamic Property's value. Use `CTL.` to 
access any controller services.")
+        description = "Updates a script engine property specified by the 
Dynamic Property's key with the value specified by the Dynamic Property's 
value. "
+                + "Use `CTL.` to access any controller services, `SQL.` to 
access any DBCPServices, `RecordReader.` to access RecordReaderFactory 
instances, or "
+                + "`RecordWriter.` to access any RecordSetWriterFactory 
instances.")
 public class ExecuteGroovyScript extends AbstractProcessor {
     public static final String GROOVY_CLASSPATH = "${groovy.classes.path}";
 
@@ -335,9 +338,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     /**
      * init SQL variables from DBCP services
      */
-    @SuppressWarnings("unchecked")
-    private void onInitSQL(HashMap SQL) throws SQLException {
-        for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+    private void onInitSQL(Map<String, Object> SQL) throws SQLException {
+        for (Map.Entry<String, Object> e : SQL.entrySet()) {
             DBCPService s = (DBCPService) e.getValue();
             OSql sql = new OSql(s.getConnection(Collections.emptyMap()));
             //try to set autocommit to false
@@ -355,9 +357,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     /**
      * before commit SQL services
      */
-    @SuppressWarnings("unchecked")
-    private void onCommitSQL(HashMap SQL) throws SQLException {
-        for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+    private void onCommitSQL(Map<String, Object> SQL) throws SQLException {
+        for (Map.Entry<String, Object> e : SQL.entrySet()) {
             OSql sql = (OSql) e.getValue();
             if (!sql.getConnection().getAutoCommit()) {
                 sql.commit();
@@ -368,9 +369,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     /**
      * finalize SQL services. no exceptions should be thrown.
      */
-    @SuppressWarnings("unchecked")
-    private void onFinitSQL(HashMap SQL) {
-        for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+    private void onFinitSQL(Map<String, Object> SQL) {
+        for (Map.Entry<String, Object> e : SQL.entrySet()) {
             OSql sql = (OSql) e.getValue();
             try {
                 if (!sql.getConnection().getAutoCommit()) {
@@ -391,9 +391,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     /**
      * exception SQL services
      */
-    @SuppressWarnings("unchecked")
-    private void onFailSQL(HashMap SQL) {
-        for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
+    private void onFailSQL(Map<String, Object> SQL) {
+        for (Map.Entry<String, Object> e : SQL.entrySet()) {
             OSql sql = (OSql) e.getValue();
             try {
                 if (!sql.getConnection().getAutoCommit()) {
@@ -412,8 +411,10 @@ public class ExecuteGroovyScript extends AbstractProcessor 
{
         //so transfer original input to failure will be possible
         GroovyProcessSessionWrap session = new 
GroovyProcessSessionWrap(_session, toFailureOnError);
 
-        HashMap CTL = new AccessMap("CTL");
-        HashMap SQL = new AccessMap("SQL");
+        Map<String, Object> CTL = new AccessMap("CTL");
+        Map<String, Object> SQL = new AccessMap("SQL");
+        Map<String, Object> RECORD_READER = new AccessMap("RecordReader");
+        Map<String, Object> RECORD_SET_WRITER = new 
AccessMap("RecordSetWriter");
 
         try {
             Script script = getGroovyScript(); //compilation must be moved to 
validation
@@ -431,6 +432,14 @@ public class ExecuteGroovyScript extends AbstractProcessor 
{
                     } else if (property.getKey().getName().startsWith("SQL.")) 
{
                         DBCPService dbcp = 
context.getProperty(property.getKey()).asControllerService(DBCPService.class);
                         SQL.put(property.getKey().getName().substring(4), 
dbcp);
+                    } else if 
(property.getKey().getName().startsWith("RecordReader.")) {
+                        // Get RecordReaderFactory controller service
+                        RecordReaderFactory recordReader = 
context.getProperty(property.getKey()).asControllerService(RecordReaderFactory.class);
+                        
RECORD_READER.put(property.getKey().getName().substring(13), recordReader);
+                    } else if 
(property.getKey().getName().startsWith("RecordWriter.")) {
+                        // Get RecordWriterFactory controller service
+                        RecordSetWriterFactory recordWriter = 
context.getProperty(property.getKey()).asControllerService(RecordSetWriterFactory.class);
+                        
RECORD_SET_WRITER.put(property.getKey().getName().substring(13), recordWriter);
                     } else {
                         // Add the dynamic property bound to its full 
PropertyValue to the script engine
                         if (property.getValue() != null) {
@@ -448,6 +457,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
             bindings.put("REL_FAILURE", REL_FAILURE);
             bindings.put("CTL", CTL);
             bindings.put("SQL", SQL);
+            bindings.put("RecordReader", RECORD_READER);
+            bindings.put("RecordWriter", RECORD_SET_WRITER);
 
             script.run();
             bindings.clear();
@@ -496,6 +507,26 @@ public class ExecuteGroovyScript extends AbstractProcessor 
{
                     .identifiesControllerService(DBCPService.class)
                     .build();
         }
+        if (propertyDescriptorName.startsWith("RecordReader.")) {
+            return new PropertyDescriptor.Builder()
+                    .name(propertyDescriptorName)
+                    .displayName(propertyDescriptorName)
+                    .required(false)
+                    .description("RecordReaderFactory controller service 
accessible from code as `" + propertyDescriptorName + "`")
+                    .dynamic(true)
+                    .identifiesControllerService(RecordReaderFactory.class)
+                    .build();
+        }
+        if (propertyDescriptorName.startsWith("RecordWriter.")) {
+            return new PropertyDescriptor.Builder()
+                    .name(propertyDescriptorName)
+                    .displayName(propertyDescriptorName)
+                    .required(false)
+                    .description("RecordSetWriterFactory controller service 
accessible from code as `" + propertyDescriptorName + "`")
+                    .dynamic(true)
+                    .identifiesControllerService(RecordSetWriterFactory.class)
+                    .build();
+        }
         return new PropertyDescriptor.Builder()
                 .name(propertyDescriptorName)
                 .required(false)
@@ -506,7 +537,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
     }
 
     /** simple HashMap with exception on access of non-existent key */
-    private class AccessMap extends HashMap {
+    private static class AccessMap extends HashMap<String,Object> {
         private String parentKey;
         AccessMap(String parentKey){
             this.parentKey=parentKey;
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
index 3adbb36..5239aac 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html
@@ -65,6 +65,18 @@
        <br/>The `SQL.` prefixed properties could be linked only to 
DBCPSercice.</td>
 </tr>
 <tr>
+    <td>RecordReader</td>
+    <td>java.util.HashMap&lt;String,<a 
href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java";>RecordReaderFactory</a>&gt;</td>
+    <td>Map populated with controller services defined with `RecordReader.*` 
processor properties.
+        <br/>The `RecordReader.` prefixed properties are to be linked to 
RecordReaderFactory controller service instances.</td>
+</tr>
+<tr>
+    <td>RecordWriter</td>
+    <td>java.util.HashMap&lt;String,<a 
href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java";>RecordSetWriterFactory</a>&gt;</td>
+    <td>Map populated with controller services defined with `RecordWriter.*` 
processor properties.
+        <br/>The `RecordWriter.` prefixed properties are to be linked to 
RecordSetWriterFactory controller service instances.</td>
+</tr>
+<tr>
        <td>Dynamic processor properties</td>
        <td>org.apache.nifi.components.PropertyDescriptor</td>
        <td>All processor properties not started with `CTL.` or `SQL.` are 
bound to script variables</td>
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
index b2b3e2d..ddba34a 100644
--- 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.nifi.processors.groovyx;
 
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.MockProcessorInitializationContext;
@@ -38,6 +45,7 @@ import java.io.FileInputStream;
 
 import java.nio.charset.StandardCharsets;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.HashMap;
 
@@ -61,6 +69,9 @@ public class ExecuteGroovyScriptTest {
 
     protected TestRunner runner;
     protected static DBCPService dbcp = null;  //to make single initialization
+    protected MockRecordParser recordParser = null;
+    protected RecordSetWriterFactory recordWriter = null;
+    protected RecordSchema recordSchema = null;
     protected ExecuteGroovyScript proc;
     public final String TEST_RESOURCE_LOCATION = 
"target/test/resources/groovy/";
     private final String TEST_CSV_DATA = "gender,title,first,last\n"
@@ -121,6 +132,21 @@ public class ExecuteGroovyScriptTest {
         runner = TestRunners.newTestRunner(proc);
         runner.addControllerService("dbcp", dbcp, new HashMap<>());
         runner.enableControllerService(dbcp);
+
+        List<RecordField> recordFields = Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("name", RecordFieldType.STRING.getDataType()),
+                new RecordField("code", RecordFieldType.INT.getDataType()));
+        recordSchema = new SimpleRecordSchema(recordFields);
+
+        recordParser = new MockRecordParser();
+        recordFields.forEach((r) -> recordParser.addSchemaField(r));
+        runner.addControllerService("myreader", recordParser, new HashMap<>());
+        runner.enableControllerService(recordParser);
+
+        recordWriter = new MockRecordWriter();
+        runner.addControllerService("mywriter", recordWriter, new HashMap<>());
+        runner.enableControllerService(recordWriter);
     }
 
     /**
@@ -225,6 +251,7 @@ public class ExecuteGroovyScriptTest {
         runner.setProperty(proc.SCRIPT_BODY, " { { ");
         runner.assertNotValid();
     }
+
     //---------------------------------------------------------
     @Test
     public void test_ctl_01_access() throws Exception {
@@ -310,6 +337,23 @@ public class ExecuteGroovyScriptTest {
     }
 
     @Test
+    public void test_record_reader_writer_access() throws Exception {
+        runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, 
TEST_RESOURCE_LOCATION + "test_record_reader_writer.groovy");
+        runner.setProperty("RecordReader.myreader", "myreader"); //pass 
myreader as a service to script
+        runner.setProperty("RecordWriter.mywriter", "mywriter"); //pass 
mywriter as a service to script
+        runner.assertValid();
+
+        recordParser.addRecord(1, "A", "XYZ");
+        runner.enqueue("");
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 
1);
+        final List<MockFlowFile> result = 
runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS.getName());
+        MockFlowFile resultFile = result.get(0);
+        resultFile.assertContentEquals("\"1\",\"A\",\"XYZ\"\n", "UTF-8");
+    }
+
+    @Test
     public void test_filter_01() throws Exception {
         runner.setProperty(proc.SCRIPT_BODY, "def ff = 
session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");
         //runner.setProperty(proc.FAIL_STRATEGY, "rollback");
diff --git 
a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy
 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy
new file mode 100644
index 0000000..aec564d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.nifi.logging.ComponentLog
+import org.apache.nifi.serialization.RecordReaderFactory
+import org.apache.nifi.processor.io.StreamCallback
+import org.apache.nifi.serialization.RecordSetWriterFactory
+
+
+//just check that it's possible to access controller services
+def ff = session.get()
+if (!ff) return
+def readerFactory = RecordReader.myreader
+assert readerFactory instanceof RecordReaderFactory
+def writerFactory = RecordWriter.mywriter
+assert writerFactory instanceof RecordSetWriterFactory
+
+session.write(ff, { inStream, outStream ->
+    def variables = new HashMap<String, String>(ff.attributes)
+    def recordReader = readerFactory.createRecordReader(variables, inStream, 
-1L, log)
+    def recordWriter = writerFactory.createWriter(log, recordReader.schema, 
outStream, variables)
+    def record = null
+    recordWriter.beginRecordSet()
+    while (record = recordReader.nextRecord()) {
+        recordWriter.write(record)
+    }
+    recordWriter.finishRecordSet()
+} as StreamCallback)
+
+REL_SUCCESS << ff

Reply via email to