Ugh I remember now, I think you're running into [1], it still needs to be done. As a workaround there might be an approach as you have with specifying the controller service ID as a property and then use "getControllerServiceLookup" to find the CS by ID.
Regards, Matt [1] https://issues.apache.org/jira/browse/NIFI-5115 On Mon, Aug 24, 2020 at 8:34 PM David Early <[email protected]> wrote: > Matt, > > I apologize for returning to the well, but we are truly stuck. > > Here is what I have: > > Controller configuration with the dynamic property with the ID of the > controller service: > > > Full code as it stands right now: > > /* > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > import groovy.xml.MarkupBuilder > import org.apache.nifi.components.PropertyDescriptor > import org.apache.nifi.controller.AbstractControllerService > import org.apache.nifi.controller.ControllerServiceInitializationContext > import org.apache.nifi.flowfile.FlowFile > import org.apache.nifi.logging.ComponentLog > import org.apache.nifi.processor.ProcessSession > import org.apache.nifi.reporting.InitializationException > import org.apache.nifi.schema.access.SchemaNotFoundException > import org.apache.nifi.serialization.RecordReaderFactory > import org.apache.nifi.serialization.RecordSetWriter > import org.apache.nifi.serialization.RecordSetWriterFactory > import org.apache.nifi.serialization.WriteResult > import org.apache.nifi.serialization.record.Record > import org.apache.nifi.serialization.record.RecordSchema > import org.apache.nifi.serialization.record.RecordSet > import org.apache.nifi.stream.io.NonCloseableOutputStream > import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient > import org.apache.nifi.distributed.cache.client.Serializer > import org.apache.nifi.distributed.cache.client.Deserializer > import org.codehaus.groovy.runtime.callsite.PerInstancePojoMetaClassSite > > import org.apache.nifi.distributed.cache.client.Serializer > import org.apache.nifi.distributed.cache.client.Deserializer > > import java.nio.charset.StandardCharsets > import org.apache.nifi.components.PropertyValue > > class GroovyRecordSetWriter extends AbstractControllerService implements > RecordSetWriter { > private int recordCount = 0 > private int HASHCNT = 2 > private final OutputStream out > private final DistributedMapCacheClient mapCacheClient > > GroovyRecordSetWriter(final OutputStream out, final mapCacheClient) { > this.out = out > this.mapCacheClient = mapCacheClient > } > > @Override > WriteResult write(Record r) throws IOException { > String mn = r.getValue("mn").toString() > > def mnhash = mn.toString().hashCode().mod(HASHCNT).abs() > > // USE MAP CACHE TO LOOK UP CACHE > //cache is mn:(avg,cnt,et,lt) > > def StringSerializer = {value, out -> > out.write(value.getBytes(StandardCharsets.UTF_8))} as > Serializer<String> > def StringDeserializer = { bytes -> new String(bytes) } as > Deserializer<String> > > def redis_entry = mapCacheClient.get(mn, StringSerializer, > StringDeserializer) > > if(redis_entry) { > def entry = redis_entry.split(",") > r.setValue("ts", entry[0]) > r.setValue("val", entry[2]) > > new OutputStreamWriter(new NonCloseableOutputStream(out)).with > {osw -> > new MarkupBuilder(osw).record { > r.schema.fieldNames.each {fieldName -> > "$fieldName" r.getValue(fieldName) > } > hash(mnhash.toString()) > } > } > > } > > recordCount++ > WriteResult.of(1,[:]) > } > > @Override > String getMimeType() { > return "application/xml" > } > > @Override > WriteResult write(final RecordSet rs) throws IOException { > int count = 0 > new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> > new MarkupBuilder(osw).recordSet { > Record r > while (r = rs.next()) { > count++ > record { > rs.schema.fieldNames.each {fieldName -> > "$fieldName" r.getValue(fieldName) > } > } > } > } > } > WriteResult.of(count, [:]) > } > > void beginRecordSet() throws IOException { > } > > @Override > WriteResult finishRecordSet() throws IOException { > return WriteResult.of(recordCount, [:]) > } > > @Override > void close() throws IOException { > } > > @Override > void flush() throws IOException { > } > } > > class GroovyRecordSetWriterFactory extends AbstractControllerService > implements RecordSetWriterFactory { > > def mapCacheClient > > // Properties > static final PropertyDescriptor CACHE_CLIENT = new > PropertyDescriptor.Builder() > .name("cache-client") > .displayName("Cache Client") > .description("Specifies the Controller Service to use for map > cache") > .defaultValue("35aa32f2-c57a-1a97-3b09-18186448fc78") > .identifiesControllerService(DistributedMapCacheClient.class) > .required(true) > .build() > > @Override > protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { > def properties = [] as ArrayList > properties.add(CACHE_CLIENT) > properties > } > > @Override > protected void init(ControllerServiceInitializationContext config) throws > InitializationException { > mapCacheClient = > getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) > } > > @Override > RecordSchema getSchema(Map<String, String> variables, RecordSchema > readSchema) throws SchemaNotFoundException, IOException { > return null > } > > > @Override > RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, > OutputStream out, Map<String, String> variables) throws > SchemaNotFoundException, IOException { > return new GroovyRecordSetWriter(out, mapCacheClient) > } > > } > > writer = new GroovyRecordSetWriterFactory() > > The error we get: > > 2020-08-25 00:23:05,037 ERROR [Timer-Driven Process Thread-25] > o.a.n.processors.standard.ConvertRecord > ConvertRecord[id=35aa33e6-c57a-1a97-8b9f-87b432db3f4f] Failed to process > StandardFlowFileRecord[uuid=8018956e-43d5-446d-809a-721531affe0f,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=1598299104924-27305628, > container=default, section=668], offset=517679, > length=21822],offset=0,name=a3bde615-4c45-4da4-9537-f5ecdd5aa6be,size=21822]; > will route to failure: java.lang.NullPointerException: Cannot invoke method > get() on null object > > The only "get()" is on line 69 above, def redis_entry = > mapCacheClient.get(mn, > StringSerializer, StringDeserializer) > > Basically, that means that the mapCacheClient is null, but we can't figure > out what is broken. It has not helped that everything we have tried has > NOT been successful in trying to implement some logging. > > My local Java guy was wondering if the PropertyDescriptor definitions > needed to be in the actual class not the factory definitions. We tried, > but get the same "null object" error no matter what we do. > > The other question was the use of getSupportedDynamicPropertyDescriptor > rather than getSupportedPropertyDescriptor. > > We will continue to experiment, but it is a bit like wandering in the > dark. Your help has been great...thank you. > > Dave > > > ------ Original Message ------ > From: "Matt Burgess" <[email protected]> > To: [email protected] > Sent: 8/24/2020 4:43:12 PM > Subject: Re: Re[2]: access property in ScriptedRecordSetWriter? > > The formatting got a bit wonky on the code snippet you provided, but > if your GroovyRecordSetWriterFactory extends > AbstractControllerService, it should have access to the getProperty() > method. Try without the context, just > "getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)" > > On Mon, Aug 24, 2020 at 4:56 PM davide <[email protected]> wrote: > > > Matt, > > So we learned some....but not enough there. > > This snippet of code: > > @Override > def init(context) { > mapCacheClient = > > > context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient) > > The problem seems to get that the "getProperty().asControllerService()" set > of methods will only work on a ProcessorContext context object, and the one > we have here is actually a ControllerServiceInitializationContext. So > "getProperty()" doesn't work. The ControllerServiceInitializationContext > does have a "getProperties()" method that returns a Map but we have not > been > able to get the "asControllerService" method to return a valid controller > service. In everything we tried, it just returns a null. > > This appears to be a difference in how a controller service is initialized > and how a processor is initialized. We have not found any specific > documentation on the controller service process, are you aware of anything? > > Any further hints here? What are we missing? > > Dave > > > > > -- > Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/ > >
