Matt,

I apologize for returning to the well, but we are truly stuck.

Here is what I have:

Controller configuration with the dynamic property with the ID of the controller service:


Full code as it stands right now:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import groovy.xml.MarkupBuilder
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import org.codehaus.groovy.runtime.callsite.PerInstancePojoMetaClassSite

import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer

import java.nio.charset.StandardCharsets
import org.apache.nifi.components.PropertyValue

class GroovyRecordSetWriter extends AbstractControllerService implements 
RecordSetWriter {
private int recordCount = 0
private int HASHCNT = 2
private final OutputStream out
private final DistributedMapCacheClient mapCacheClient

GroovyRecordSetWriter(final OutputStream out, final mapCacheClient) {
this.out = out
this.mapCacheClient = mapCacheClient
    }

@Override
WriteResult write(Record r) throws IOException {
        String mn = r.getValue("mn").toString()

def mnhash = mn.toString().hashCode().mod(HASHCNT).abs()

// USE MAP CACHE TO LOOK UP CACHE
        //cache is mn:(avg,cnt,et,lt)

def StringSerializer = {value, out ->
out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>

def redis_entry = mapCacheClient.get(mn, StringSerializer, StringDeserializer)

if(redis_entry) {
def entry = redis_entry.split(",")
            r.setValue("ts", entry[0])
            r.setValue("val", entry[2])

new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
hash(mnhash.toString())
}
            }

}

recordCount++
        WriteResult.of(1,[:])
    }

@Override
String getMimeType() {
return "application/xml"
}

@Override
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
                    count++
                    record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
                    }
}
}
        }
WriteResult.of(count, [:])
    }

void beginRecordSet() throws IOException {
    }

@Override
WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:])
    }

@Override
void close() throws IOException {
    }

@Override
void flush() throws IOException {
    }
}

class GroovyRecordSetWriterFactory extends AbstractControllerService implements 
RecordSetWriterFactory {

def mapCacheClient

// Properties
static final PropertyDescriptor CACHE_CLIENT = new PropertyDescriptor.Builder()
            .name("cache-client")
            .displayName("Cache Client")
            .description("Specifies the Controller Service to use for map 
cache")
            .defaultValue("35aa32f2-c57a-1a97-3b09-18186448fc78")
            .identifiesControllerService(DistributedMapCacheClient.class)
            .required(true)
            .build()

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
def properties = [] as ArrayList
        properties.add(CACHE_CLIENT)
        properties
    }

@Override
protected void init(ControllerServiceInitializationContext config) throws 
InitializationException {
mapCacheClient = 
getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
    }

@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) 
throws SchemaNotFoundException, IOException {
return null
}


@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream 
out, Map<String, String> variables) throws SchemaNotFoundException, IOException 
{
return new GroovyRecordSetWriter(out, mapCacheClient)
    }

}

writer = new GroovyRecordSetWriterFactory()
The error we get:

2020-08-25 00:23:05,037 ERROR [Timer-Driven Process Thread-25] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=35aa33e6-c57a-1a97-8b9f-87b432db3f4f] Failed to process StandardFlowFileRecord[uuid=8018956e-43d5-446d-809a-721531affe0f,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1598299104924-27305628, container=default, section=668], offset=517679, length=21822],offset=0,name=a3bde615-4c45-4da4-9537-f5ecdd5aa6be,size=21822]; will route to failure: java.lang.NullPointerException: Cannot invoke method get() on null object

The only "get()" is on line 69 above, def redis_entry = mapCacheClient.get(mn, StringSerializer, StringDeserializer)

Basically, that means that the mapCacheClient is null, but we can't figure out what is broken. It has not helped that everything we have tried has NOT been successful in trying to implement some logging.

My local Java guy was wondering if the PropertyDescriptor definitions needed to be in the actual class not the factory definitions. We tried, but get the same "null object" error no matter what we do.

The other question was the use of getSupportedDynamicPropertyDescriptor rather than getSupportedPropertyDescriptor.

We will continue to experiment, but it is a bit like wandering in the dark. Your help has been great...thank you.

Dave


------ Original Message ------
From: "Matt Burgess" <[email protected]>
To: [email protected]
Sent: 8/24/2020 4:43:12 PM
Subject: Re: Re[2]: access property in ScriptedRecordSetWriter?

The formatting got a bit wonky on the code snippet you provided, but
if your GroovyRecordSetWriterFactory extends
AbstractControllerService, it should have access to the getProperty()
method. Try without the context, just
"getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)"

On Mon, Aug 24, 2020 at 4:56 PM davide <[email protected]> wrote:

 Matt,

 So we learned some....but not enough there.

 This snippet of code:

   @Override
   def init(context) {
        mapCacheClient =

 
context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)

 The problem seems to get that the "getProperty().asControllerService()" set
 of methods will only work on a ProcessorContext context object, and the one
 we have here is actually a ControllerServiceInitializationContext.  So
 "getProperty()" doesn't work.  The ControllerServiceInitializationContext
 does have a "getProperties()" method that returns a Map but we have not been
 able to get the "asControllerService" method to return a valid controller
 service.  In everything we tried, it just returns a null.

 This appears to be a difference in how a controller service is initialized
 and how a processor is initialized.  We have not found any specific
 documentation on the controller service process, are you aware of anything?

 Any further hints here?  What are we missing?

 Dave




 --
 Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Reply via email to