Matt, The method "initialization" in AbstractControllerService is this:
@Override
public final void initialize(final ControllerServiceInitializationContext
context) throws InitializationException {
this.identifier = context.getIdentifier();
serviceLookup = context.getControllerServiceLookup();
logger = context.getLogger();
stateManager = context.getStateManager();
nodeTypeProvider = context.getNodeTypeProvider();
init(context);
}
My (limited) understanding is that if this is labeled "final", then I
cannot do an @Override on it. Is this why the thing is hung up on my
attempt to override "initialization"?
Dave ------ Original Message ------ From: "Matt Burgess" <[email protected]> To: [email protected] Sent: 8/21/2020 3:39:24 PM Subject: Re: access property in ScriptedRecordSetWriter?
Oops copy paste error, the GroovyScriptedRecordSetWriterFactory has to extend AbstractControllerServiceSent from my iPhoneOn Aug 21, 2020, at 4:50 PM, David Early <[email protected]> wrote: Matt,This is very cool of you, and I feel like this is close, but once again hanging up on my inexperience with the Java environment for getting the pipes all lined up.Below is my entire code. Note you originally had : @Override def init(context) { mapCacheClient =context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)}But I think that should be "initialize" based on an examination of the AbstractControllerService code.That said, this is giving me the following error when trying to start the controller:<g0ezr3fw.png> And it gives that whether I use "init" or "initialize" /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import groovy.xml.MarkupBuilder import org.apache.nifi.controller.AbstractControllerService import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog import org.apache.nifi.processor.ProcessSession import org.apache.nifi.schema.access.SchemaNotFoundException import org.apache.nifi.serialization.RecordReaderFactory import org.apache.nifi.serialization.RecordSetWriter import org.apache.nifi.serialization.RecordSetWriterFactory import org.apache.nifi.serialization.WriteResult import org.apache.nifi.serialization.record.Record import org.apache.nifi.serialization.record.RecordSchema import org.apache.nifi.serialization.record.RecordSet import org.apache.nifi.stream.io.NonCloseableOutputStream import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient import org.apache.nifi.distributed.cache.client.Serializer import org.apache.nifi.distributed.cache.client.Deserializer import java.nio.charset.StandardCharsets import org.apache.nifi.processor.ProcessContext class GroovyRecordSetWriter implements RecordSetWriter { private int recordCount = 0; private int HASHCNT = 2; private int PERIOD = 300000; private int PERC_OFFSET = 0.95; private final OutputStream out; private final DistributedMapCacheClient mapCacheClient public GroovyRecordSetWriter(final OutputStream out) { this.out = out; this.mapCacheClient = mapCacheClient } @Override WriteResult write(Record r) throws IOException { def mnhash = r.getValue('mn').toString().hashCode().mod(HASHCNT).abs() // USE MAP CACHE TO LOOK UP CACHE // MODIFY RECORD INFO BASED ON ANALYSIS new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).record { r.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } hash(mnhash.toString()) } }; recordCount++; WriteResult.of(1,[:]) } @Override String getMimeType() { return 'application/xml' } @Override WriteResult write(final RecordSet rs) throws IOException { int count = 0 new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> new MarkupBuilder(osw).recordSet { Record r while (r = rs.next()) { count++ record { rs.schema.fieldNames.each {fieldName -> "$fieldName" r.getValue(fieldName) } } } } } WriteResult.of(count, [:]) } public void beginRecordSet() throws IOException { } @Override public WriteResult finishRecordSet() throws IOException { return WriteResult.of(recordCount, [:]); } @Override public void close() throws IOException { } @Override public void flush() throws IOException { } } class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { // Properties def mapCacheClient // Properties static final PropertyDescriptor CACHE_CLIENT = new PropertyDescriptor.Builder() .name("cache-client") .displayName("Cache Client") .description("Specifies the Controller Service to use for map cache") .identifiesControllerService(DistributedMapCacheClient.class) .required(true) .build() @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { def properties = [] as ArrayList properties.add(CACHE_CLIENT) properties } @Override def initialize(context) { mapCacheClient = context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) } @Override RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null } @Override RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out, mapCacheClient) } // } writer = new GroovyRecordSetWriterFactory()
