Ugh I remember now, I think you're running into [1], it still needs to be
done. As a workaround there might be an approach as you have with
specifying the controller service ID as a property and then use
"getControllerServiceLookup" to find the CS by ID.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-5115


On Mon, Aug 24, 2020 at 8:34 PM David Early <[email protected]>
wrote:

> Matt,
>
> I apologize for returning to the well, but we are truly stuck.
>
> Here is what I have:
>
> Controller configuration with the dynamic property with the ID of the
> controller service:
>
>
> Full code as it stands right now:
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> import groovy.xml.MarkupBuilder
> import org.apache.nifi.components.PropertyDescriptor
> import org.apache.nifi.controller.AbstractControllerService
> import org.apache.nifi.controller.ControllerServiceInitializationContext
> import org.apache.nifi.flowfile.FlowFile
> import org.apache.nifi.logging.ComponentLog
> import org.apache.nifi.processor.ProcessSession
> import org.apache.nifi.reporting.InitializationException
> import org.apache.nifi.schema.access.SchemaNotFoundException
> import org.apache.nifi.serialization.RecordReaderFactory
> import org.apache.nifi.serialization.RecordSetWriter
> import org.apache.nifi.serialization.RecordSetWriterFactory
> import org.apache.nifi.serialization.WriteResult
> import org.apache.nifi.serialization.record.Record
> import org.apache.nifi.serialization.record.RecordSchema
> import org.apache.nifi.serialization.record.RecordSet
> import org.apache.nifi.stream.io.NonCloseableOutputStream
> import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
> import org.apache.nifi.distributed.cache.client.Serializer
> import org.apache.nifi.distributed.cache.client.Deserializer
> import org.codehaus.groovy.runtime.callsite.PerInstancePojoMetaClassSite
>
> import org.apache.nifi.distributed.cache.client.Serializer
> import org.apache.nifi.distributed.cache.client.Deserializer
>
> import java.nio.charset.StandardCharsets
> import org.apache.nifi.components.PropertyValue
>
> class GroovyRecordSetWriter extends AbstractControllerService implements 
> RecordSetWriter {
>     private int recordCount = 0
>     private int HASHCNT = 2
>     private final OutputStream out
>     private final DistributedMapCacheClient mapCacheClient
>
>     GroovyRecordSetWriter(final OutputStream out, final mapCacheClient) {
>         this.out = out
>         this.mapCacheClient = mapCacheClient
>     }
>
>     @Override
>     WriteResult write(Record r) throws IOException {
>         String mn = r.getValue("mn").toString()
>
>         def mnhash = mn.toString().hashCode().mod(HASHCNT).abs()
>
>         // USE MAP CACHE TO LOOK UP CACHE
>         //cache is mn:(avg,cnt,et,lt)
>
>         def StringSerializer = {value, out ->
>             out.write(value.getBytes(StandardCharsets.UTF_8))} as 
> Serializer<String>
>         def StringDeserializer = { bytes -> new String(bytes) } as 
> Deserializer<String>
>
>         def redis_entry = mapCacheClient.get(mn, StringSerializer, 
> StringDeserializer)
>
>         if(redis_entry) {
>             def entry = redis_entry.split(",")
>             r.setValue("ts", entry[0])
>             r.setValue("val", entry[2])
>
>             new OutputStreamWriter(new NonCloseableOutputStream(out)).with 
> {osw ->
>                 new MarkupBuilder(osw).record {
>                     r.schema.fieldNames.each {fieldName ->
>                         "$fieldName" r.getValue(fieldName)
>                     }
>                     hash(mnhash.toString())
>                 }
>             }
>
>         }
>
>         recordCount++
>         WriteResult.of(1,[:])
>     }
>
>     @Override
>     String getMimeType() {
>         return "application/xml"
>     }
>
>     @Override
>     WriteResult write(final RecordSet rs) throws IOException {
>         int count = 0
>         new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
>             new MarkupBuilder(osw).recordSet {
>                 Record r
>                 while (r = rs.next()) {
>                     count++
>                     record {
>                         rs.schema.fieldNames.each {fieldName ->
>                             "$fieldName" r.getValue(fieldName)
>                         }
>                     }
>                 }
>             }
>         }
>         WriteResult.of(count, [:])
>     }
>
>     void beginRecordSet() throws IOException {
>     }
>
>     @Override
>     WriteResult finishRecordSet() throws IOException {
>         return WriteResult.of(recordCount, [:])
>     }
>
>     @Override
>     void close() throws IOException {
>     }
>
>     @Override
>     void flush() throws IOException {
>     }
> }
>
> class GroovyRecordSetWriterFactory extends AbstractControllerService 
> implements RecordSetWriterFactory {
>
>     def mapCacheClient
>
>     // Properties
>     static final PropertyDescriptor CACHE_CLIENT = new 
> PropertyDescriptor.Builder()
>             .name("cache-client")
>             .displayName("Cache Client")
>             .description("Specifies the Controller Service to use for map 
> cache")
>             .defaultValue("35aa32f2-c57a-1a97-3b09-18186448fc78")
>             .identifiesControllerService(DistributedMapCacheClient.class)
>             .required(true)
>             .build()
>
>     @Override
>     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
>         def properties = [] as ArrayList
>         properties.add(CACHE_CLIENT)
>         properties
>     }
>
>     @Override
>     protected void init(ControllerServiceInitializationContext config) throws 
> InitializationException {
>         mapCacheClient = 
> getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
>     }
>
>     @Override
>     RecordSchema getSchema(Map<String, String> variables, RecordSchema 
> readSchema) throws SchemaNotFoundException, IOException {
>         return null
>     }
>
>
>     @Override
>     RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, 
> OutputStream out, Map<String, String> variables) throws 
> SchemaNotFoundException, IOException {
>         return new GroovyRecordSetWriter(out, mapCacheClient)
>     }
>
> }
>
> writer = new GroovyRecordSetWriterFactory()
>
> The error we get:
>
> 2020-08-25 00:23:05,037 ERROR [Timer-Driven Process Thread-25]
> o.a.n.processors.standard.ConvertRecord
> ConvertRecord[id=35aa33e6-c57a-1a97-8b9f-87b432db3f4f] Failed to process
> StandardFlowFileRecord[uuid=8018956e-43d5-446d-809a-721531affe0f,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1598299104924-27305628,
> container=default, section=668], offset=517679,
> length=21822],offset=0,name=a3bde615-4c45-4da4-9537-f5ecdd5aa6be,size=21822];
> will route to failure: java.lang.NullPointerException: Cannot invoke method
> get() on null object
>
> The only "get()" is on line 69 above,  def redis_entry = 
> mapCacheClient.get(mn,
> StringSerializer, StringDeserializer)
>
> Basically, that means that the mapCacheClient is null, but we can't figure
> out what is broken.  It has not helped that everything we have tried has
> NOT been successful in trying to implement some logging.
>
> My local Java guy was wondering if the PropertyDescriptor definitions
> needed to be in the actual class not the factory definitions.  We tried,
> but get the same "null object" error no matter what we do.
>
> The other question was the use of getSupportedDynamicPropertyDescriptor
> rather than getSupportedPropertyDescriptor.
>
> We will continue to experiment, but it is a bit like wandering in the
> dark.  Your help has been great...thank you.
>
> Dave
>
>
> ------ Original Message ------
> From: "Matt Burgess" <[email protected]>
> To: [email protected]
> Sent: 8/24/2020 4:43:12 PM
> Subject: Re: Re[2]: access property in ScriptedRecordSetWriter?
>
> The formatting got a bit wonky on the code snippet you provided, but
> if your GroovyRecordSetWriterFactory extends
> AbstractControllerService, it should have access to the getProperty()
> method. Try without the context, just
> "getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)"
>
> On Mon, Aug 24, 2020 at 4:56 PM davide <[email protected]> wrote:
>
>
> Matt,
>
> So we learned some....but not enough there.
>
> This snippet of code:
>
> @Override
> def init(context) {
> mapCacheClient =
>
>
> context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
>
> The problem seems to get that the "getProperty().asControllerService()" set
> of methods will only work on a ProcessorContext context object, and the one
> we have here is actually a ControllerServiceInitializationContext. So
> "getProperty()" doesn't work. The ControllerServiceInitializationContext
> does have a "getProperties()" method that returns a Map but we have not
> been
> able to get the "asControllerService" method to return a valid controller
> service. In everything we tried, it just returns a null.
>
> This appears to be a difference in how a controller service is initialized
> and how a processor is initialized. We have not found any specific
> documentation on the controller service process, are you aware of anything?
>
> Any further hints here? What are we missing?
>
> Dave
>
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
>
>

Reply via email to