Oops copy paste error, the GroovyScriptedRecordSetWriterFactory has to extend AbstractControllerService
Sent from my iPhone > On Aug 21, 2020, at 4:50 PM, David Early <[email protected]> wrote: > > > Matt, > > This is very cool of you, and I feel like this is close, but once again > hanging up on my inexperience with the Java environment for getting the pipes > all lined up. > > Below is my entire code. Note you originally had : > @Override > def init(context) { > mapCacheClient = > > context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) > } > > But I think that should be "initialize" based on an examination of the > AbstractControllerService code. > > That said, this is giving me the following error when trying to start the > controller: > <g0ezr3fw.png> > > And it gives that whether I use "init" or "initialize" > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > import groovy.xml.MarkupBuilder > import org.apache.nifi.controller.AbstractControllerService > import org.apache.nifi.flowfile.FlowFile > import org.apache.nifi.logging.ComponentLog > import org.apache.nifi.processor.ProcessSession > import org.apache.nifi.schema.access.SchemaNotFoundException > import org.apache.nifi.serialization.RecordReaderFactory > import org.apache.nifi.serialization.RecordSetWriter > import org.apache.nifi.serialization.RecordSetWriterFactory > import org.apache.nifi.serialization.WriteResult > import org.apache.nifi.serialization.record.Record > import org.apache.nifi.serialization.record.RecordSchema > import org.apache.nifi.serialization.record.RecordSet > import org.apache.nifi.stream.io.NonCloseableOutputStream > import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient > import org.apache.nifi.distributed.cache.client.Serializer > import org.apache.nifi.distributed.cache.client.Deserializer > import java.nio.charset.StandardCharsets > import org.apache.nifi.processor.ProcessContext > > class GroovyRecordSetWriter implements RecordSetWriter { > private int recordCount = 0; > private int HASHCNT = 2; > private int PERIOD = 300000; > private int PERC_OFFSET = 0.95; > private final OutputStream out; > private final DistributedMapCacheClient mapCacheClient > > public GroovyRecordSetWriter(final OutputStream out) { > this.out = out; > this.mapCacheClient = mapCacheClient > } > > @Override > WriteResult write(Record r) throws IOException { > def mnhash = r.getValue('mn').toString().hashCode().mod(HASHCNT).abs() > > // USE MAP CACHE TO LOOK UP CACHE > // MODIFY RECORD INFO BASED ON ANALYSIS > new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> > new MarkupBuilder(osw).record { > r.schema.fieldNames.each {fieldName -> > "$fieldName" r.getValue(fieldName) > } > hash(mnhash.toString()) > } > }; > > recordCount++; > WriteResult.of(1,[:]) > } > > @Override > String getMimeType() { > return 'application/xml' > } > > @Override > WriteResult write(final RecordSet rs) throws IOException { > int count = 0 > new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> > new MarkupBuilder(osw).recordSet { > Record r > while (r = rs.next()) { > count++ > record { > rs.schema.fieldNames.each {fieldName -> > "$fieldName" r.getValue(fieldName) > } > } > } > } > } > WriteResult.of(count, [:]) > } > > public void beginRecordSet() throws IOException { > } > > @Override > public WriteResult finishRecordSet() throws IOException { > return WriteResult.of(recordCount, [:]); > } > > @Override > public void close() throws IOException { > } > > @Override > public void flush() throws IOException { > } > } > > class GroovyRecordSetWriterFactory extends AbstractControllerService > implements RecordSetWriterFactory { > > // Properties > def mapCacheClient > > // Properties > static final PropertyDescriptor CACHE_CLIENT = new > PropertyDescriptor.Builder() > .name("cache-client") > .displayName("Cache Client") > .description("Specifies the Controller Service to use for map cache") > .identifiesControllerService(DistributedMapCacheClient.class) > .required(true) > .build() > > @Override > protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { > def properties = [] as ArrayList > properties.add(CACHE_CLIENT) > properties > } > > @Override > def initialize(context) { > mapCacheClient = > > context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) > } > > @Override > RecordSchema getSchema(Map<String, String> variables, RecordSchema > readSchema) throws SchemaNotFoundException, IOException { > return null > } > > @Override > RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, > OutputStream out, Map<String, String> variables) throws > SchemaNotFoundException, IOException { > return new GroovyRecordSetWriter(out, mapCacheClient) > } > > // > > } > > writer = new GroovyRecordSetWriterFactory() >
