You would probably be better off implementing your own controller
service with the same interface.

On Wed, Sep 9, 2020 at 10:13 PM sanjeet rath <> wrote:
> Thank you MIke for the quick reply.
> I was really struggling with this functionality.
> i have gone through the code ,what i understood is i should use the 
> "nifi-elastic-search-restapi-processor" project.
> In it the JsonQueryelasticSearch processor, it uses the "Client Service" 
> Controller service. and i need to modify this controler. service to use AWS 
> shared code which i shared with you in the trailed mail chain.
> Is my understanding is correct ?
> Regards,
> Sanjeet
> On Thu, Sep 10, 2020 at 3:18 AM Mike Thomsen <> wrote:
>> Sanjeet,
>> As provided, this won't integrate well with the existing NiFi
>> processors. You would need to implement it as a controller service
>> object and update the processors to use it. Also, if you want to use
>> processors based on the official Elasticsearch client API, the ones
>> under the "REST API bundle" are the best fit because they already use
>> controller services that use the official Elastic clients.
>> Thanks,
>> Mike
>> On Wed, Sep 9, 2020 at 12:14 PM sanjeet rath <> wrote:
>> >
>> > Hi ,
>> >
>> > We are using AWS managed ElasticSearch and our nifi is hosted in EC2.
>> > I have a use case of building a custom processor on top of 
>> > putElasticSearchHTTP, where it will use aws IAM based role 
>> > awscredentialprovider service to connect AWS ElasticSearch.
>> > This will be similar to PUTSQS where we are using IAM role based 
>> > awscredentialprovider service to connect SQS and its working fine.
>> >
>> > But there is no awscredentailprovider controller service is available in 
>> > putElasticSearchHTTP.
>> >
>> > So my plan is adding a awscredentailprovider controller service to 
>> > putElasticSearchHTTP , where i will use bellow code  to connect to 
>> > elasticsearch.
>> >
>> > Is my approach correct ? Could you provide any better thought on this ?
>> >
>> > public class AmazonElasticsearchServiceSample { private static String 
>> > serviceName = "es"; private static String region = "us-west-1"; private 
>> > static String aesEndpoint = "";; 
>> > private static String payload = "{ \"type\": \"s3\", \"settings\": { 
>> > \"bucket\": \"your-bucket\", \"region\": \"us-west-1\", \"role_arn\": 
>> > \"arn:aws:iam::123456789012:role/TheServiceRole\" } }"; private static 
>> > String snapshotPath = "/_snapshot/my-snapshot-repo"; private static String 
>> > sampleDocument = "{" + "\"title\":\"Walk the Line\"," + 
>> > "\"director\":\"James Mangold\"," + "\"year\":\"2005\"}"; private static 
>> > String indexingPath = "/my-index/_doc"; static final 
>> > AWSCredentialsProvider credentialsProvider = new 
>> > DefaultAWSCredentialsProviderChain(); public static void main(String[] 
>> > args) throws IOException { RestClient esClient = esClient(serviceName, 
>> > region); // Register a snapshot repository HttpEntity entity = new 
>> > NStringEntity(payload, ContentType.APPLICATION_JSON); Request request = 
>> > new Request("PUT", snapshotPath); request.setEntity(entity); // 
>> > request.addParameter(name, value); // optional parameters Response 
>> > response = esClient.performRequest(request); 
>> > System.out.println(response.toString()); // Index a document entity = new 
>> > NStringEntity(sampleDocument, ContentType.APPLICATION_JSON); String id = 
>> > "1"; request = new Request("PUT", indexingPath + "/" + id); 
>> > request.setEntity(entity); // Using a String instead of an HttpEntity sets 
>> > Content-Type to application/json automatically. // 
>> > request.setJsonEntity(sampleDocument); response = 
>> > esClient.performRequest(request); System.out.println(response.toString()); 
>> > }
>> > public static RestClient esClient(String serviceName, String region) { 
>> > AWS4Signer signer = new AWS4Signer(); signer.setServiceName(serviceName); 
>> > signer.setRegionName(region); HttpRequestInterceptor interceptor = new 
>> > AWSRequestSigningApacheInterceptor(serviceName, signer, 
>> > credentialsProvider); return 
>> > RestClient.builder(HttpHost.create(aesEndpoint)).setHttpClientConfigCallback(hacb
>> >  -> hacb.addInterceptorLast(interceptor)).build(); }
>> >
>> >
>> >
>> >
>> > Regards,
>> > Sanjeet
>> >
>> > --
>> > Sanjeet Kumar Rath,
>> > mob- +91 8777577470
>> >
> --
> Sanjeet Kumar Rath,
> mob- +91 8777577470

Reply via email to