[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515678#comment-16515678 ] ASF GitHub Bot commented on NIFI-4982: -- Github user zenfenan commented on the issue: https://github.com/apache/nifi/pull/2558 Thanks @ijokarumawak and @mattyb149 for the review. Thanks @pvillard31 Everything looks good. Merged to master. > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515674#comment-16515674 ] ASF GitHub Bot commented on NIFI-4982: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2558 > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515489#comment-16515489 ] ASF GitHub Bot commented on NIFI-4982: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r195999042 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.expression.ExpressionLanguageScope; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { + +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String KEY = "key"; +private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); + +private volatile DistributedMapCacheClient cache; +private volatile static Charset charset; +private final Serializer keySerializer = new StringSerializer(); +private final Deserializer valueDeserializer = new StringDeserializer(); + +public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() +.name("distributed-map-cache-service") +.displayName("Distributed Cache Service") +.description("The Controller Service that is used to get the cached values.") +.required(true) +.identifiesControllerService(DistributedMapCacheClient.class) +.build(); + +public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() +.name("character-encoding") +.displayName("Character Encoding") +.description("Specifies a character encoding to use.") +.required(true) +.allowableValues(getStandardCharsetNames()) +.defaultValue(StandardCharsets.UTF_8.displayName()) +.build(); + +private static Set getStandardCharsetNames
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515291#comment-16515291 ] ASF GitHub Bot commented on NIFI-4982: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2558 No further comments from me, I'm +1. @zenfenan please merge. Thanks! > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515114#comment-16515114 ] ASF GitHub Bot commented on NIFI-4982: -- Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r195933469 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.expression.ExpressionLanguageScope; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { + +private static final List STANDARD_CHARSETS = Arrays.asList( +StandardCharsets.UTF_8, +StandardCharsets.US_ASCII, +StandardCharsets.ISO_8859_1, +StandardCharsets.UTF_16, +StandardCharsets.UTF_16LE, +StandardCharsets.UTF_16BE); + +private static final String KEY = "key"; +private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); + +private volatile DistributedMapCacheClient cache; +private volatile static Charset charset; +private final Serializer keySerializer = new StringSerializer(); +private final Deserializer valueDeserializer = new StringDeserializer(); + +public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() +.name("distributed-map-cache-service") +.displayName("Distributed Cache Service") +.description("The Controller Service that is used to get the cached values.") +.required(true) +.identifiesControllerService(DistributedMapCacheClient.class) +.build(); + +public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() +.name("character-encoding") +.displayName("Character Encoding") +.description("Specifies a character encoding to use.") +.required(true) +.allowableValues(getStandardCharsetNames()) +.defaultValue(StandardCharsets.UTF_8.displayName()) +.build(); + +private static Set getStandardCharsetNames()
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510691#comment-16510691 ] ASF GitHub Bot commented on NIFI-4982: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r194971580 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { +private static final String KEY = "key"; --- End diff -- LookupAttribute processor loops through dynamic properties to fetch multiple keys, but it only supports single coordinates. |Dynamic property name|Dynamic property value|Result| |||--| |resultA|`${attributeAKey}`|Lookup cache with `attributeAKey` FlowFile attribute value and put it to `resultA` attribute| |resultB|`${attributeBKey}`|Lookup cache with `attributeBKey` FlowFile attribute value and put it to `resultB` attribute| Since this LookupService implements StringLookupService, I think it is correct to require a "key" to fetch the String. > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433987#comment-16433987 ] ASF GitHub Bot commented on NIFI-4982: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r180771987 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { +private static final String KEY = "key"; +private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); + +private volatile DistributedMapCacheClient cache; +private final Serializer keySerializer = new StringSerializer(); +private final Deserializer valueDeserializer = new StringDeserializer(); + +public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() +.name("distributed-map-cache-service") +.displayName("Distributed Cache Service") +.description("The Controller Service that is used to get the cached values.") +.required(true) +.identifiesControllerService(DistributedMapCacheClient.class) +.build(); + +@Override +protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { +return new PropertyDescriptor.Builder() +.name(propertyDescriptorName) +.required(false) +.dynamic(true) +.addValidator(Validator.VALID) +.expressionLanguageSupported(true) +.build(); +} + +@OnEnabled +public void cacheConfiguredValues(final ConfigurationContext context) { +cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); +} + +@Override +protected List getSupportedPropertyDescriptors() { +final List descriptors = new ArrayList<>(); +descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE); +return descriptors; +} + +@Override +public Optional lookup(final Map coordinates) { +if (coordinates ==
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433984#comment-16433984 ] ASF GitHub Bot commented on NIFI-4982: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r180771727 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { +private static final String KEY = "key"; --- End diff -- Does requiring a single key "key" mean we can only do one lookup at a time? Perhaps we should not require any keys and let the dynamic properties define the lookup keys? > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433986#comment-16433986 ] ASF GitHub Bot commented on NIFI-4982: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r180772442 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestDistributedMapCacheLookupService { + +final static Optional EMPTY_STRING = Optional.empty(); + +@Test +public void testDistributedMapCacheLookupService() throws InitializationException { +final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); +final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService(); +final DistributedMapCacheClient client = new DistributedMapCacheClientImpl(); + +runner.addControllerService("client", client); +runner.addControllerService("lookup-service", service); +runner.setProperty(service, DistributedMapCacheLookupService.PROP_DISTRIBUTED_CACHE_SERVICE, "client"); + +runner.enableControllerService(client); +runner.enableControllerService(service); + +runner.assertValid(service); + +assertThat(service, instanceOf(LookupService.class)); + +final Optional get = service.lookup(Collections.singletonMap("key", "myKey")); +assertEquals(Optional.of("myValue"), get); + +final Optional absent = service.lookup(Collections.singletonMap("key", "absentKey")); +assertEquals(EMPTY_STRING, absent); +} + +static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { + +private Map map = new HashMap(); + +@OnEnabled +public void onEnabled(final ConfigurationContext context) { +map.put("myKey", "myValue"); +} + +@Override +public void close() throws IOException { +} + +@Override +public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { +} + +@Override +protected java.util.List getSupportedPropertyDescriptors() { +return new ArrayList<>(); +} + +@Override +public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { +throw new IOException("not implemented"); --- End diff -- Nitpick, but these should probably be UnsupportedOperationExceptions :P >
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433985#comment-16433985 ] ASF GitHub Bot commented on NIFI-4982: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2558#discussion_r180771298 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " ++ "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { +private static final String KEY = "key"; +private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); + +private volatile DistributedMapCacheClient cache; +private final Serializer keySerializer = new StringSerializer(); +private final Deserializer valueDeserializer = new StringDeserializer(); + +public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() +.name("distributed-map-cache-service") +.displayName("Distributed Cache Service") +.description("The Controller Service that is used to get the cached values.") +.required(true) +.identifiesControllerService(DistributedMapCacheClient.class) +.build(); + +@Override +protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { +return new PropertyDescriptor.Builder() +.name(propertyDescriptorName) +.required(false) +.dynamic(true) +.addValidator(Validator.VALID) +.expressionLanguageSupported(true) --- End diff -- As of your PR #2205 , this should be changed to indicate the scope :) > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to looku
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401664#comment-16401664 ] ASF GitHub Bot commented on NIFI-4982: -- Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/2558 A template to help reviewing/testing this PR is available in the JIRA: https://issues.apache.org/jira/secure/attachment/12914839/distributedMapCacheLookup.xml > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4982) Add a DistributedMapCacheLookupService
[ https://issues.apache.org/jira/browse/NIFI-4982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401661#comment-16401661 ] ASF GitHub Bot commented on NIFI-4982: -- GitHub user pvillard31 opened a pull request: https://github.com/apache/nifi/pull/2558 NIFI-4982 - Add a DistributedMapCacheLookupService Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pvillard31/nifi mapcachelookup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2558.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2558 commit f13e83043ce1c86df933d182902c3bba10a8c0c1 Author: Pierre Villard Date: 2018-03-15T20:28:26Z NIFI-4982 - Add a DistributedMapCacheLookupService > Add a DistributedMapCacheLookupService > -- > > Key: NIFI-4982 > URL: https://issues.apache.org/jira/browse/NIFI-4982 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Attachments: distributedMapCacheLookup.xml > > > Add a new lookup controller service that takes a Distributed Map Cache client > to lookup for values based on a key. This allows users to leverage the > internal Disitributed Map Cache server and/or the Redis to perform lookup > access (with LookupRecord processor for instance). > Attached is a template to test the PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)