Oops copy paste error, the GroovyScriptedRecordSetWriterFactory has to extend 
AbstractControllerService

Sent from my iPhone

> On Aug 21, 2020, at 4:50 PM, David Early <[email protected]> wrote:
> 
> 
> Matt,
> 
> This is very cool of you, and I feel like this is close, but once again 
> hanging up on my inexperience with the Java environment for getting the pipes 
> all lined up.
> 
> Below is my entire code.  Note you originally had :
>   @Override
>         def init(context) {
>            mapCacheClient =
>                 
> context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
>    }
> 
> But I think that should be "initialize" based on an examination of the 
> AbstractControllerService code.  
> 
> That said, this is giving me the following error when trying to start the 
> controller:
> <g0ezr3fw.png>
> 
> And it gives that whether I use "init" or "initialize"
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> import groovy.xml.MarkupBuilder
> import org.apache.nifi.controller.AbstractControllerService
> import org.apache.nifi.flowfile.FlowFile
> import org.apache.nifi.logging.ComponentLog
> import org.apache.nifi.processor.ProcessSession
> import org.apache.nifi.schema.access.SchemaNotFoundException
> import org.apache.nifi.serialization.RecordReaderFactory
> import org.apache.nifi.serialization.RecordSetWriter
> import org.apache.nifi.serialization.RecordSetWriterFactory
> import org.apache.nifi.serialization.WriteResult
> import org.apache.nifi.serialization.record.Record
> import org.apache.nifi.serialization.record.RecordSchema
> import org.apache.nifi.serialization.record.RecordSet
> import org.apache.nifi.stream.io.NonCloseableOutputStream
> import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
> import org.apache.nifi.distributed.cache.client.Serializer
> import org.apache.nifi.distributed.cache.client.Deserializer
> import java.nio.charset.StandardCharsets
> import org.apache.nifi.processor.ProcessContext
> 
> class GroovyRecordSetWriter implements RecordSetWriter {
>     private int recordCount = 0;
>     private int HASHCNT = 2;
>     private int PERIOD = 300000;
>     private int PERC_OFFSET = 0.95;
>     private final OutputStream out;
>     private final DistributedMapCacheClient mapCacheClient
> 
>     public GroovyRecordSetWriter(final OutputStream out) {
>         this.out = out;
>         this.mapCacheClient = mapCacheClient
>     }
> 
>     @Override
>     WriteResult write(Record r) throws IOException {
>         def mnhash = r.getValue('mn').toString().hashCode().mod(HASHCNT).abs()
> 
>         // USE MAP CACHE TO LOOK UP CACHE
>         // MODIFY RECORD INFO BASED ON ANALYSIS
>         new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
>             new MarkupBuilder(osw).record {
>                 r.schema.fieldNames.each {fieldName ->
>                     "$fieldName" r.getValue(fieldName)
>                 }
>                 hash(mnhash.toString())
>             }
>         };
> 
>         recordCount++;
>         WriteResult.of(1,[:])
>     }
> 
>     @Override
>     String getMimeType() {
>         return 'application/xml'
>     }
> 
>     @Override
>     WriteResult write(final RecordSet rs) throws IOException {
>         int count = 0
>         new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
>             new MarkupBuilder(osw).recordSet {
>                 Record r
>                 while (r = rs.next()) {
>                     count++
>                     record {
>                         rs.schema.fieldNames.each {fieldName ->
>                             "$fieldName" r.getValue(fieldName)
>                         }
>                     }
>                 }
>             }
>         }
>         WriteResult.of(count, [:])
>     }
> 
>     public void beginRecordSet() throws IOException {
>     }
> 
>     @Override
>     public WriteResult finishRecordSet() throws IOException {
>         return WriteResult.of(recordCount, [:]);
>     }
> 
>     @Override
>     public void close() throws IOException {
>     }
> 
>     @Override
>     public void flush() throws IOException {
>     }
> }
> 
> class GroovyRecordSetWriterFactory extends AbstractControllerService 
> implements RecordSetWriterFactory {
> 
>     // Properties
>     def mapCacheClient
> 
>     // Properties
>     static final PropertyDescriptor CACHE_CLIENT = new 
> PropertyDescriptor.Builder()
>         .name("cache-client")
>         .displayName("Cache Client")
>         .description("Specifies the Controller Service to use for map cache")
>         .identifiesControllerService(DistributedMapCacheClient.class)
>         .required(true)
>         .build()
> 
>     @Override
>     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
>         def properties = [] as ArrayList
>         properties.add(CACHE_CLIENT)
>         properties
>     }
> 
>     @Override
>     def initialize(context) {
>         mapCacheClient =
>                 
> context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
>     }
> 
>     @Override
>     RecordSchema getSchema(Map<String, String> variables, RecordSchema 
> readSchema) throws SchemaNotFoundException, IOException {
>         return null
>     }
> 
>     @Override
>     RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, 
> OutputStream out, Map<String, String> variables) throws 
> SchemaNotFoundException, IOException {
>         return new GroovyRecordSetWriter(out, mapCacheClient)
>     }
> 
>     //
> 
> }
> 
> writer = new GroovyRecordSetWriterFactory()
> 

Reply via email to