Matt,

The method "initialization" in AbstractControllerService is this:

@Override
public final void initialize(final ControllerServiceInitializationContext 
context) throws InitializationException {
this.identifier = context.getIdentifier();
serviceLookup = context.getControllerServiceLookup();
logger = context.getLogger();
stateManager = context.getStateManager();
nodeTypeProvider = context.getNodeTypeProvider();
init(context);
}
My (limited) understanding is that if this is labeled "final", then I cannot do an @Override on it. Is this why the thing is hung up on my attempt to override "initialization"?

Dave


------ Original Message ------
From: "Matt Burgess" <[email protected]>
To: [email protected]
Sent: 8/21/2020 3:39:24 PM
Subject: Re: access property in ScriptedRecordSetWriter?

Oops copy paste error, the GroovyScriptedRecordSetWriterFactory has to extend AbstractControllerService

Sent from my iPhone

On Aug 21, 2020, at 4:50 PM, David Early <[email protected]> wrote:


Matt,

This is very cool of you, and I feel like this is close, but once again hanging up on my inexperience with the Java environment for getting the pipes all lined up.

Below is my entire code.  Note you originally had :
  @Override
        def init(context) {
           mapCacheClient =
context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
   }

But I think that should be "initialize" based on an examination of the AbstractControllerService code.

That said, this is giving me the following error when trying to start the controller:
<g0ezr3fw.png>

And it gives that whether I use "init" or "initialize"
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.distributed.cache.client.Deserializer
import java.nio.charset.StandardCharsets
import org.apache.nifi.processor.ProcessContext

class GroovyRecordSetWriter implements RecordSetWriter {
private int recordCount = 0;
private int HASHCNT = 2;
private int PERIOD = 300000;
private int PERC_OFFSET = 0.95;
private final OutputStream out;
private final DistributedMapCacheClient mapCacheClient

public GroovyRecordSetWriter(final OutputStream out) {
this.out = out;
this.mapCacheClient = mapCacheClient
}

@Override
WriteResult write(Record r) throws IOException {
def mnhash = r.getValue('mn').toString().hashCode().mod(HASHCNT).abs()

// USE MAP CACHE TO LOOK UP CACHE
        // MODIFY RECORD INFO BASED ON ANALYSIS
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
hash(mnhash.toString())
}
        };

recordCount++;
        WriteResult.of(1,[:])
    }

@Override
String getMimeType() {
return 'application/xml'
}

@Override
WriteResult write(final RecordSet rs) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
                    count++
                    record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
                    }
}
}
        }
WriteResult.of(count, [:])
    }

public void beginRecordSet() throws IOException {
    }

@Override
public WriteResult finishRecordSet() throws IOException {
return WriteResult.of(recordCount, [:]);
    }

@Override
public void close() throws IOException {
    }

@Override
public void flush() throws IOException {
    }
}

class GroovyRecordSetWriterFactory extends AbstractControllerService implements 
RecordSetWriterFactory {

// Properties
def mapCacheClient

// Properties
static final PropertyDescriptor CACHE_CLIENT = new PropertyDescriptor.Builder()
        .name("cache-client")
        .displayName("Cache Client")
        .description("Specifies the Controller Service to use for map cache")
        .identifiesControllerService(DistributedMapCacheClient.class)
        .required(true)
        .build()

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
def properties = [] as ArrayList
        properties.add(CACHE_CLIENT)
        properties
    }

@Override
def initialize(context) {
mapCacheClient =
                
context.getProperty(CACHE_CLIENT).asControllerService(DistributedMapCacheClient)
    }

@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) 
throws SchemaNotFoundException, IOException {
return null
}

@Override
RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream 
out, Map<String, String> variables) throws SchemaNotFoundException, IOException 
{
return new GroovyRecordSetWriter(out, mapCacheClient)
    }

//

}

writer = new GroovyRecordSetWriterFactory()

Reply via email to