This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 824cc0ed77b93d69a33358b1e8323102f30837ff Author: Matthew Burgess <mattyb...@apache.org> AuthorDate: Tue Jan 14 09:56:43 2020 -0500 NIFI-7014: This closes #3985. Add RecordReader/Writer access in ExecuteGroovyScript Signed-off-by: Joe Witt <joew...@apache.org> --- .../nifi-groovyx-processors/pom.xml | 17 ++++++ .../processors/groovyx/ExecuteGroovyScript.java | 65 ++++++++++++++++------ .../additionalDetails.html | 12 ++++ .../groovyx/ExecuteGroovyScriptTest.java | 44 +++++++++++++++ .../groovy/test_record_reader_writer.groovy | 45 +++++++++++++++ 5 files changed, 166 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml index 94589bd..3adefbc 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/pom.xml @@ -37,6 +37,11 @@ <version>1.11.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + <version>1.11.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-json</artifactId> <version>${nifi.groovy.version}</version> @@ -66,6 +71,18 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + <version>1.11.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.11.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.12.1.1</version> diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java index f67df2b..5516aeb 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScript.java @@ -46,6 +46,8 @@ import org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap; import org.apache.nifi.processors.groovyx.sql.OSql; import org.apache.nifi.processors.groovyx.util.Files; import org.apache.nifi.processors.groovyx.util.Validators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; import org.codehaus.groovy.control.CompilerConfiguration; import org.codehaus.groovy.runtime.ResourceGroovyMethods; import org.codehaus.groovy.runtime.StackTraceUtils; @@ -80,8 +82,9 @@ import java.util.Set; @DynamicProperty(name = "A script engine property to update", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, - description = "Updates a script engine property specified by the Dynamic Property's key with the value " - + "specified by the Dynamic Property's value. Use `CTL.` to access any controller services.") + description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. " + + "Use `CTL.` to access any controller services, `SQL.` to access any DBCPServices, `RecordReader.` to access RecordReaderFactory instances, or " + + "`RecordWriter.` to access any RecordSetWriterFactory instances.") public class ExecuteGroovyScript extends AbstractProcessor { public static final String GROOVY_CLASSPATH = "${groovy.classes.path}"; @@ -335,9 +338,8 @@ public class ExecuteGroovyScript extends AbstractProcessor { /** * init SQL variables from DBCP services */ - @SuppressWarnings("unchecked") - private void onInitSQL(HashMap SQL) throws SQLException { - for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { + private void onInitSQL(Map<String, Object> SQL) throws SQLException { + for (Map.Entry<String, Object> e : SQL.entrySet()) { DBCPService s = (DBCPService) e.getValue(); OSql sql = new OSql(s.getConnection(Collections.emptyMap())); //try to set autocommit to false @@ -355,9 +357,8 @@ public class ExecuteGroovyScript extends AbstractProcessor { /** * before commit SQL services */ - @SuppressWarnings("unchecked") - private void onCommitSQL(HashMap SQL) throws SQLException { - for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { + private void onCommitSQL(Map<String, Object> SQL) throws SQLException { + for (Map.Entry<String, Object> e : SQL.entrySet()) { OSql sql = (OSql) e.getValue(); if (!sql.getConnection().getAutoCommit()) { sql.commit(); @@ -368,9 +369,8 @@ public class ExecuteGroovyScript extends AbstractProcessor { /** * finalize SQL services. no exceptions should be thrown. */ - @SuppressWarnings("unchecked") - private void onFinitSQL(HashMap SQL) { - for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { + private void onFinitSQL(Map<String, Object> SQL) { + for (Map.Entry<String, Object> e : SQL.entrySet()) { OSql sql = (OSql) e.getValue(); try { if (!sql.getConnection().getAutoCommit()) { @@ -391,9 +391,8 @@ public class ExecuteGroovyScript extends AbstractProcessor { /** * exception SQL services */ - @SuppressWarnings("unchecked") - private void onFailSQL(HashMap SQL) { - for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) { + private void onFailSQL(Map<String, Object> SQL) { + for (Map.Entry<String, Object> e : SQL.entrySet()) { OSql sql = (OSql) e.getValue(); try { if (!sql.getConnection().getAutoCommit()) { @@ -412,8 +411,10 @@ public class ExecuteGroovyScript extends AbstractProcessor { //so transfer original input to failure will be possible GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError); - HashMap CTL = new AccessMap("CTL"); - HashMap SQL = new AccessMap("SQL"); + Map<String, Object> CTL = new AccessMap("CTL"); + Map<String, Object> SQL = new AccessMap("SQL"); + Map<String, Object> RECORD_READER = new AccessMap("RecordReader"); + Map<String, Object> RECORD_SET_WRITER = new AccessMap("RecordSetWriter"); try { Script script = getGroovyScript(); //compilation must be moved to validation @@ -431,6 +432,14 @@ public class ExecuteGroovyScript extends AbstractProcessor { } else if (property.getKey().getName().startsWith("SQL.")) { DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class); SQL.put(property.getKey().getName().substring(4), dbcp); + } else if (property.getKey().getName().startsWith("RecordReader.")) { + // Get RecordReaderFactory controller service + RecordReaderFactory recordReader = context.getProperty(property.getKey()).asControllerService(RecordReaderFactory.class); + RECORD_READER.put(property.getKey().getName().substring(13), recordReader); + } else if (property.getKey().getName().startsWith("RecordWriter.")) { + // Get RecordWriterFactory controller service + RecordSetWriterFactory recordWriter = context.getProperty(property.getKey()).asControllerService(RecordSetWriterFactory.class); + RECORD_SET_WRITER.put(property.getKey().getName().substring(13), recordWriter); } else { // Add the dynamic property bound to its full PropertyValue to the script engine if (property.getValue() != null) { @@ -448,6 +457,8 @@ public class ExecuteGroovyScript extends AbstractProcessor { bindings.put("REL_FAILURE", REL_FAILURE); bindings.put("CTL", CTL); bindings.put("SQL", SQL); + bindings.put("RecordReader", RECORD_READER); + bindings.put("RecordWriter", RECORD_SET_WRITER); script.run(); bindings.clear(); @@ -496,6 +507,26 @@ public class ExecuteGroovyScript extends AbstractProcessor { .identifiesControllerService(DBCPService.class) .build(); } + if (propertyDescriptorName.startsWith("RecordReader.")) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .required(false) + .description("RecordReaderFactory controller service accessible from code as `" + propertyDescriptorName + "`") + .dynamic(true) + .identifiesControllerService(RecordReaderFactory.class) + .build(); + } + if (propertyDescriptorName.startsWith("RecordWriter.")) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .required(false) + .description("RecordSetWriterFactory controller service accessible from code as `" + propertyDescriptorName + "`") + .dynamic(true) + .identifiesControllerService(RecordSetWriterFactory.class) + .build(); + } return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .required(false) @@ -506,7 +537,7 @@ public class ExecuteGroovyScript extends AbstractProcessor { } /** simple HashMap with exception on access of non-existent key */ - private class AccessMap extends HashMap { + private static class AccessMap extends HashMap<String,Object> { private String parentKey; AccessMap(String parentKey){ this.parentKey=parentKey; diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html index 3adbb36..5239aac 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/main/resources/docs/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html @@ -65,6 +65,18 @@ <br/>The `SQL.` prefixed properties could be linked only to DBCPSercice.</td> </tr> <tr> + <td>RecordReader</td> + <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java">RecordReaderFactory</a>></td> + <td>Map populated with controller services defined with `RecordReader.*` processor properties. + <br/>The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.</td> +</tr> +<tr> + <td>RecordWriter</td> + <td>java.util.HashMap<String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java">RecordSetWriterFactory</a>></td> + <td>Map populated with controller services defined with `RecordWriter.*` processor properties. + <br/>The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.</td> +</tr> +<tr> <td>Dynamic processor properties</td> <td>org.apache.nifi.components.PropertyDescriptor</td> <td>All processor properties not started with `CTL.` or `SQL.` are bound to script variables</td> diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java index b2b3e2d..ddba34a 100644 --- a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/java/org/apache/nifi/processors/groovyx/ExecuteGroovyScriptTest.java @@ -16,6 +16,13 @@ */ package org.apache.nifi.processors.groovyx; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockProcessorInitializationContext; @@ -38,6 +45,7 @@ import java.io.FileInputStream; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.HashMap; @@ -61,6 +69,9 @@ public class ExecuteGroovyScriptTest { protected TestRunner runner; protected static DBCPService dbcp = null; //to make single initialization + protected MockRecordParser recordParser = null; + protected RecordSetWriterFactory recordWriter = null; + protected RecordSchema recordSchema = null; protected ExecuteGroovyScript proc; public final String TEST_RESOURCE_LOCATION = "target/test/resources/groovy/"; private final String TEST_CSV_DATA = "gender,title,first,last\n" @@ -121,6 +132,21 @@ public class ExecuteGroovyScriptTest { runner = TestRunners.newTestRunner(proc); runner.addControllerService("dbcp", dbcp, new HashMap<>()); runner.enableControllerService(dbcp); + + List<RecordField> recordFields = Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("code", RecordFieldType.INT.getDataType())); + recordSchema = new SimpleRecordSchema(recordFields); + + recordParser = new MockRecordParser(); + recordFields.forEach((r) -> recordParser.addSchemaField(r)); + runner.addControllerService("myreader", recordParser, new HashMap<>()); + runner.enableControllerService(recordParser); + + recordWriter = new MockRecordWriter(); + runner.addControllerService("mywriter", recordWriter, new HashMap<>()); + runner.enableControllerService(recordWriter); } /** @@ -225,6 +251,7 @@ public class ExecuteGroovyScriptTest { runner.setProperty(proc.SCRIPT_BODY, " { { "); runner.assertNotValid(); } + //--------------------------------------------------------- @Test public void test_ctl_01_access() throws Exception { @@ -310,6 +337,23 @@ public class ExecuteGroovyScriptTest { } @Test + public void test_record_reader_writer_access() throws Exception { + runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_record_reader_writer.groovy"); + runner.setProperty("RecordReader.myreader", "myreader"); //pass myreader as a service to script + runner.setProperty("RecordWriter.mywriter", "mywriter"); //pass mywriter as a service to script + runner.assertValid(); + + recordParser.addRecord(1, "A", "XYZ"); + runner.enqueue(""); + runner.run(); + + runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1); + final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS.getName()); + MockFlowFile resultFile = result.get(0); + resultFile.assertContentEquals("\"1\",\"A\",\"XYZ\"\n", "UTF-8"); + } + + @Test public void test_filter_01() throws Exception { runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;"); //runner.setProperty(proc.FAIL_STRATEGY, "rollback"); diff --git a/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy new file mode 100644 index 0000000..aec564d --- /dev/null +++ b/nifi-nar-bundles/nifi-groovyx-bundle/nifi-groovyx-processors/src/test/resources/groovy/test_record_reader_writer.groovy @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.serialization.RecordReaderFactory +import org.apache.nifi.processor.io.StreamCallback +import org.apache.nifi.serialization.RecordSetWriterFactory + + +//just check that it's possible to access controller services +def ff = session.get() +if (!ff) return +def readerFactory = RecordReader.myreader +assert readerFactory instanceof RecordReaderFactory +def writerFactory = RecordWriter.mywriter +assert writerFactory instanceof RecordSetWriterFactory + +session.write(ff, { inStream, outStream -> + def variables = new HashMap<String, String>(ff.attributes) + def recordReader = readerFactory.createRecordReader(variables, inStream, -1L, log) + def recordWriter = writerFactory.createWriter(log, recordReader.schema, outStream, variables) + def record = null + recordWriter.beginRecordSet() + while (record = recordReader.nextRecord()) { + recordWriter.write(record) + } + recordWriter.finishRecordSet() +} as StreamCallback) + +REL_SUCCESS << ff