Matt,
I apologize for returning to the well, but we are truly stuck.
Here is what I have:
Controller configuration with the dynamic property with the ID of the
controller service:
Full code as it stands right now:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.xml.MarkupBuilder
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import org.codehaus.groovy.runtime.callsite.PerInstancePojoMetaClassSite
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets
import org.apache.nifi.components.PropertyValue
class GroovyRecordSetWriter extends AbstractControllerService implements
RecordSetWriter {
private int recordCount = 0
private int HASHCNT = 2
private final OutputStream out
private final DistributedMapCacheClient mapCacheClient
GroovyRecordSetWriter(final OutputStream out, final mapCacheClient) {
this.out = out
this.mapCacheClient = mapCacheClient
}
@Override
WriteResult write(Record r) throws IOException {
String mn = r.getValue("mn").toString()
def mnhash = mn.toString().hashCode().mod(HASHCNT).abs()
// USE MAP CACHE TO LOOK UP CACHE
//cache is mn:(avg,cnt,et,lt)
def StringSerializer = {value, out ->
out.write(value.getBytes(StandardCharsets.UTF_8))} as Serializer<String>
def StringDeserializer = { bytes -> new String(bytes) } as Deserializer<String>
def redis_entry = mapCacheClient.get(mn, StringSerializer, StringDeserializer)
if(redis_entry) {
def entry = redis_entry.split(",")
r.setValue("ts", entry[0])
r.setValue("val", entry[2])
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
hash(mnhash.toString())
}
}
}
recordCount++
WriteResult.of(1,[:])
}
@Override
String getMimeType() {
return "application/xml"
}
@Override
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
count++
record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
}
}
WriteResult.of(count, [:])
}
void beginRecordSet() throws IOException {
}
@Override
WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:])
}
@Override
void close() throws IOException {
}
@Override
void flush() throws IOException {
}
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements
RecordSetWriterFactory {
def mapCacheClient
// Properties
static final PropertyDescriptor CACHE_CLIENT = new PropertyDescriptor.Builder()
.name("cache-client")
.displayName("Cache Client")
.description("Specifies the Controller Service to use for map
cache")
.defaultValue("35aa32f2-c57a-1a97-3b09-18186448fc78")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(true)
.build()
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
def properties = [] as ArrayList
properties.add(CACHE_CLIENT)
properties
}
@Override
protected void init(ControllerServiceInitializationContext config) throws
InitializationException {
mapCacheClient =
getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
}
@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema)
throws SchemaNotFoundException, IOException {
return null
}
@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream
out, Map<String, String> variables) throws SchemaNotFoundException, IOException
{
return new GroovyRecordSetWriter(out, mapCacheClient)
}
}
writer = new GroovyRecordSetWriterFactory()
The error we get:
2020-08-25 00:23:05,037 ERROR [Timer-Driven Process Thread-25]
o.a.n.processors.standard.ConvertRecord
ConvertRecord[id=35aa33e6-c57a-1a97-8b9f-87b432db3f4f] Failed to process
StandardFlowFileRecord[uuid=8018956e-43d5-446d-809a-721531affe0f,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1598299104924-27305628,
container=default, section=668], offset=517679,
length=21822],offset=0,name=a3bde615-4c45-4da4-9537-f5ecdd5aa6be,size=21822];
will route to failure: java.lang.NullPointerException: Cannot invoke
method get() on null object
The only "get()" is on line 69 above, def redis_entry =
mapCacheClient.get(mn, StringSerializer, StringDeserializer)
Basically, that means that the mapCacheClient is null, but we can't
figure out what is broken. It has not helped that everything we have
tried has NOT been successful in trying to implement some logging.
My local Java guy was wondering if the PropertyDescriptor definitions
needed to be in the actual class not the factory definitions. We tried,
but get the same "null object" error no matter what we do.
The other question was the use of getSupportedDynamicPropertyDescriptor
rather than getSupportedPropertyDescriptor.
We will continue to experiment, but it is a bit like wandering in the
dark. Your help has been great...thank you.
Dave
------ Original Message ------
From: "Matt Burgess" <[email protected]>
To: [email protected]
Sent: 8/24/2020 4:43:12 PM
Subject: Re: Re[2]: access property in ScriptedRecordSetWriter?
The formatting got a bit wonky on the code snippet you provided, but
if your GroovyRecordSetWriterFactory extends
AbstractControllerService, it should have access to the getProperty()
method. Try without the context, just
"getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)"
On Mon, Aug 24, 2020 at 4:56 PM davide <[email protected]> wrote:
Matt,
So we learned some....but not enough there.
This snippet of code:
@Override
def init(context) {
mapCacheClient =
context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
The problem seems to get that the "getProperty().asControllerService()" set
of methods will only work on a ProcessorContext context object, and the one
we have here is actually a ControllerServiceInitializationContext. So
"getProperty()" doesn't work. The ControllerServiceInitializationContext
does have a "getProperties()" method that returns a Map but we have not been
able to get the "asControllerService" method to return a valid controller
service. In everything we tried, it just returns a null.
This appears to be a difference in how a controller service is initialized
and how a processor is initialized. We have not found any specific
documentation on the controller service process, are you aware of anything?
Any further hints here? What are we missing?
Dave
--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/