NIFI-2777: NIFI-2856: - Only performing response merging when the node is the cluster cooridinator even if there is a single response. - Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all responses.
This closes #1095. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0b1d15a7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0b1d15a7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0b1d15a7 Branch: refs/heads/support/nifi-1.0.x Commit: 0b1d15a786a46dfb569197f97766e421b49096ac Parents: 47b368f Author: Matt Gilman <matt.c.gil...@gmail.com> Authored: Tue Oct 4 14:52:18 2016 -0400 Committer: jpercivall <jperciv...@apache.org> Committed: Wed Dec 14 16:40:52 2016 -0500 ---------------------------------------------------------------------- .../coordination/http/HttpResponseMapper.java | 65 ++++++ .../coordination/http/HttpResponseMerger.java | 65 ------ .../http/StandardHttpResponseMapper.java | 228 +++++++++++++++++++ .../http/StandardHttpResponseMerger.java | 227 ------------------ .../StandardAsyncClusterResponse.java | 22 +- .../ThreadPoolRequestReplicator.java | 26 +-- .../node/NodeClusterCoordinator.java | 37 +-- .../manager/PropertyDescriptorDtoMerger.java | 2 +- .../http/StandardHttpResponseMapperSpec.groovy | 217 ++++++++++++++++++ .../http/StandardHttpResponseMergerSpec.groovy | 218 ------------------ 10 files changed, 555 insertions(+), 552 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java new file mode 100644 index 0000000..659f5e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http; + +import org.apache.nifi.cluster.manager.NodeResponse; + +import java.net.URI; +import java.util.Set; + +/** + * <p> + * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster + * and distilling them down to a single response that would be appropriate to respond with, to the + * user/client who made the original web requests. + * </p> + */ +public interface HttpResponseMapper { + + /** + * Maps the responses from all nodes in the cluster to a single NodeResponse object that + * is appropriate to respond with + * + * @param uri the URI of the web request that was made + * @param httpMethod the HTTP Method that was used when making the request + * @param nodeResponses the responses received from the individual nodes + * + * @return a single NodeResponse that represents the response that should be returned to the user/client + */ + NodeResponse mapResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses, boolean merge); + + /** + * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses + * that indicate that the node was unable to fulfill the request + * + * @param allResponses the responses to filter + * + * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses + * that indicate that the node was unable to fulfill the request + */ + Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses); + + /** + * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them + * + * @param uri the URI of the request + * @param httpMethod the HTTP Method of the request + * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise + */ + boolean isResponseInterpreted(URI uri, String httpMethod); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java deleted file mode 100644 index 6102b74..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.cluster.coordination.http; - -import java.net.URI; -import java.util.Set; - -import org.apache.nifi.cluster.manager.NodeResponse; - -/** - * <p> - * An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster - * and distilling them down to a single response that would be appropriate to respond with, to the - * user/client who made the original web requests. - * </p> - */ -public interface HttpResponseMerger { - - /** - * Maps the responses from all nodes in the cluster to a single NodeResponse object that - * is appropriate to respond with - * - * @param uri the URI of the web request that was made - * @param httpMethod the HTTP Method that was used when making the request - * @param nodeResponses the responses received from the individual nodes - * - * @return a single NodeResponse that represents the response that should be returned to the user/client - */ - NodeResponse mergeResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses); - - /** - * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses - * that indicate that the node was unable to fulfill the request - * - * @param allResponses the responses to filter - * - * @return a subset (or equal set) of the given Node Responses, such that all of those returned are the responses - * that indicate that the node was unable to fulfill the request - */ - Set<NodeResponse> getProblematicNodeResponses(Set<NodeResponse> allResponses); - - /** - * Indicates whether or not the responses from nodes for the given URI & HTTP method must be interpreted in order to merge them - * - * @param uri the URI of the request - * @param httpMethod the HTTP Method of the request - * @return <code>true</code> if the response must be interpreted, <code>false</code> otherwise - */ - boolean isResponseInterpreted(URI uri, String httpMethod); -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java new file mode 100644 index 0000000..2f1bb18 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http; + +import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger; +import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class StandardHttpResponseMapper implements HttpResponseMapper { + + private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); + + private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>(); + + public StandardHttpResponseMapper(final NiFiProperties nifiProperties) { + final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); + long snapshotMillis; + try { + snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); + } + endpointMergers.add(new ControllerStatusEndpointMerger()); + endpointMergers.add(new ControllerBulletinsEndpointMerger()); + endpointMergers.add(new GroupStatusEndpointMerger()); + endpointMergers.add(new ProcessorStatusEndpointMerger()); + endpointMergers.add(new ConnectionStatusEndpiontMerger()); + endpointMergers.add(new PortStatusEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger()); + endpointMergers.add(new ProcessorEndpointMerger()); + endpointMergers.add(new ProcessorsEndpointMerger()); + endpointMergers.add(new ConnectionEndpointMerger()); + endpointMergers.add(new ConnectionsEndpointMerger()); + endpointMergers.add(new PortEndpointMerger()); + endpointMergers.add(new InputPortsEndpointMerger()); + endpointMergers.add(new OutputPortsEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupEndpointMerger()); + endpointMergers.add(new RemoteProcessGroupsEndpointMerger()); + endpointMergers.add(new ProcessGroupEndpointMerger()); + endpointMergers.add(new ProcessGroupsEndpointMerger()); + endpointMergers.add(new FlowSnippetEndpointMerger()); + endpointMergers.add(new ProvenanceQueryEndpointMerger()); + endpointMergers.add(new ProvenanceEventEndpointMerger()); + endpointMergers.add(new ControllerServiceEndpointMerger()); + endpointMergers.add(new ControllerServicesEndpointMerger()); + endpointMergers.add(new ControllerServiceReferenceEndpointMerger()); + endpointMergers.add(new ReportingTaskEndpointMerger()); + endpointMergers.add(new ReportingTasksEndpointMerger()); + endpointMergers.add(new DropRequestEndpiontMerger()); + endpointMergers.add(new ListFlowFilesEndpointMerger()); + endpointMergers.add(new ComponentStateEndpointMerger()); + endpointMergers.add(new BulletinBoardEndpointMerger()); + endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis)); + endpointMergers.add(new SystemDiagnosticsEndpointMerger()); + endpointMergers.add(new CountersEndpointMerger()); + endpointMergers.add(new FlowMerger()); + endpointMergers.add(new ControllerConfigurationEndpointMerger()); + endpointMergers.add(new CurrentUserEndpointMerger()); + endpointMergers.add(new FlowConfigurationEndpointMerger()); + endpointMergers.add(new TemplatesEndpointMerger()); + endpointMergers.add(new LabelEndpointMerger()); + endpointMergers.add(new LabelsEndpointMerger()); + endpointMergers.add(new FunnelEndpointMerger()); + endpointMergers.add(new FunnelsEndpointMerger()); + } + + @Override + public NodeResponse mapResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses, final boolean merge) { + final boolean hasSuccess = hasSuccessfulResponse(nodeResponses); + if (!hasSuccess) { + // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that. + // Otherwise, it doesn't matter which one we choose. We do this because if we replicate + // a mutable request, it's possible that one node will respond with a 409, for instance, while + // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want + // the failed response. + final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next()); + + // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't + // leave data lingering on the socket and ensures that we don't consume the content of the response + // that we intend to respond with + drainResponses(nodeResponses, clientResponse); + return clientResponse; + } + + // Determine which responses are successful + final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet()); + final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); + + final NodeResponse clientResponse; + if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) { + // If there are problematic responses, at least one of the nodes couldn't complete the request + clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse( + problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next())); + return clientResponse; + } else { + // Choose any of the successful responses to be the 'chosen one'. + clientResponse = successResponses.iterator().next(); + } + + if (merge == false) { + return clientResponse; + } + + EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod); + if (merger == null) { + return clientResponse; + } + + final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse); + return response; + } + + @Override + public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) { + // Check if there are any 2xx responses + final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses); + + if (containsSuccessfulResponse) { + // If there is a 2xx response, we consider a response to be problematic if it is not 2xx + return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); + } else { + // If no node is successful, we consider a problematic response to be only those that are 5xx + return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet()); + } + } + + @Override + public boolean isResponseInterpreted(final URI uri, final String httpMethod) { + return getEndpointResponseMerger(uri, httpMethod) != null; + } + + private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { + return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null); + } + + private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) { + return allResponses.stream().anyMatch(p -> p.is2xx()); + } + + private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) { + responses.stream() + .parallel() // parallelize the draining of the responses, since we have multiple streams to consume + .filter(response -> response != exclude) // don't include the explicitly excluded node + .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content + .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out + } + + private void drainResponse(final NodeResponse response) { + if (response.hasThrowable()) { + return; + } + + try { + ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream()); + } catch (final IOException ioe) { + logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java deleted file mode 100644 index 7b1d8f6..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster.coordination.http; - -import org.apache.nifi.cluster.coordination.http.endpoints.BulletinBoardEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ComponentStateEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionStatusEndpiontMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerBulletinsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerConfigurationEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServiceReferenceEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerServicesEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ControllerStatusEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.CountersEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.CurrentUserEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.DropRequestEndpiontMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.FlowConfigurationEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.FlowMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.FlowSnippetEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.FunnelEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.FunnelsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.GroupStatusEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.OutputPortsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.PortEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceEventEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ProvenanceQueryEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupStatusEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.RemoteProcessGroupsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTaskEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.ReportingTasksEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.StatusHistoryEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.SystemDiagnosticsEndpointMerger; -import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerger; -import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.stream.io.NullOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.StreamingOutput; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; - -public class StandardHttpResponseMerger implements HttpResponseMerger { - - private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class); - - private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>(); - - public StandardHttpResponseMerger(final NiFiProperties nifiProperties) { - final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); - long snapshotMillis; - try { - snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); - } - endpointMergers.add(new ControllerStatusEndpointMerger()); - endpointMergers.add(new ControllerBulletinsEndpointMerger()); - endpointMergers.add(new GroupStatusEndpointMerger()); - endpointMergers.add(new ProcessorStatusEndpointMerger()); - endpointMergers.add(new ConnectionStatusEndpiontMerger()); - endpointMergers.add(new PortStatusEndpointMerger()); - endpointMergers.add(new RemoteProcessGroupStatusEndpointMerger()); - endpointMergers.add(new ProcessorEndpointMerger()); - endpointMergers.add(new ProcessorsEndpointMerger()); - endpointMergers.add(new ConnectionEndpointMerger()); - endpointMergers.add(new ConnectionsEndpointMerger()); - endpointMergers.add(new PortEndpointMerger()); - endpointMergers.add(new InputPortsEndpointMerger()); - endpointMergers.add(new OutputPortsEndpointMerger()); - endpointMergers.add(new RemoteProcessGroupEndpointMerger()); - endpointMergers.add(new RemoteProcessGroupsEndpointMerger()); - endpointMergers.add(new ProcessGroupEndpointMerger()); - endpointMergers.add(new ProcessGroupsEndpointMerger()); - endpointMergers.add(new FlowSnippetEndpointMerger()); - endpointMergers.add(new ProvenanceQueryEndpointMerger()); - endpointMergers.add(new ProvenanceEventEndpointMerger()); - endpointMergers.add(new ControllerServiceEndpointMerger()); - endpointMergers.add(new ControllerServicesEndpointMerger()); - endpointMergers.add(new ControllerServiceReferenceEndpointMerger()); - endpointMergers.add(new ReportingTaskEndpointMerger()); - endpointMergers.add(new ReportingTasksEndpointMerger()); - endpointMergers.add(new DropRequestEndpiontMerger()); - endpointMergers.add(new ListFlowFilesEndpointMerger()); - endpointMergers.add(new ComponentStateEndpointMerger()); - endpointMergers.add(new BulletinBoardEndpointMerger()); - endpointMergers.add(new StatusHistoryEndpointMerger(snapshotMillis)); - endpointMergers.add(new SystemDiagnosticsEndpointMerger()); - endpointMergers.add(new CountersEndpointMerger()); - endpointMergers.add(new FlowMerger()); - endpointMergers.add(new ControllerConfigurationEndpointMerger()); - endpointMergers.add(new CurrentUserEndpointMerger()); - endpointMergers.add(new FlowConfigurationEndpointMerger()); - endpointMergers.add(new TemplatesEndpointMerger()); - endpointMergers.add(new LabelEndpointMerger()); - endpointMergers.add(new LabelsEndpointMerger()); - endpointMergers.add(new FunnelEndpointMerger()); - endpointMergers.add(new FunnelsEndpointMerger()); - } - - @Override - public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) { - if (nodeResponses.size() == 1) { - return nodeResponses.iterator().next(); - } - - final boolean hasSuccess = hasSuccessfulResponse(nodeResponses); - if (!hasSuccess) { - // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that. - // Otherwise, it doesn't matter which one we choose. We do this because if we replicate - // a mutable request, it's possible that one node will respond with a 409, for instance, while - // others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want - // the failed response. - final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next()); - - // Drain the response from all nodes except for the 'chosen one'. This ensures that we don't - // leave data lingering on the socket and ensures that we don't consume the content of the response - // that we intend to respond with - drainResponses(nodeResponses, clientResponse); - return clientResponse; - } - - // Determine which responses are successful - final Set<NodeResponse> successResponses = nodeResponses.stream().filter(p -> p.is2xx()).collect(Collectors.toSet()); - final Set<NodeResponse> problematicResponses = nodeResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); - - final NodeResponse clientResponse; - if ("GET".equalsIgnoreCase(httpMethod) && problematicResponses.size() > 0) { - // If there are problematic responses, at least one of the nodes couldn't complete the request - clientResponse = problematicResponses.stream().filter(p -> p.getStatus() >= 400 && p.getStatus() < 500).findFirst().orElse( - problematicResponses.stream().filter(p -> p.getStatus() > 500).findFirst().orElse(problematicResponses.iterator().next())); - return clientResponse; - } else { - // Choose any of the successful responses to be the 'chosen one'. - clientResponse = successResponses.iterator().next(); - } - EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod); - if (merger == null) { - return clientResponse; - } - - final NodeResponse response = merger.merge(uri, httpMethod, successResponses, problematicResponses, clientResponse); - return response; - } - - @Override - public Set<NodeResponse> getProblematicNodeResponses(final Set<NodeResponse> allResponses) { - // Check if there are any 2xx responses - final boolean containsSuccessfulResponse = hasSuccessfulResponse(allResponses); - - if (containsSuccessfulResponse) { - // If there is a 2xx response, we consider a response to be problematic if it is not 2xx - return allResponses.stream().filter(p -> !p.is2xx()).collect(Collectors.toSet()); - } else { - // If no node is successful, we consider a problematic response to be only those that are 5xx - return allResponses.stream().filter(p -> p.is5xx()).collect(Collectors.toSet()); - } - } - - @Override - public boolean isResponseInterpreted(final URI uri, final String httpMethod) { - return getEndpointResponseMerger(uri, httpMethod) != null; - } - - private EndpointResponseMerger getEndpointResponseMerger(final URI uri, final String httpMethod) { - return endpointMergers.stream().filter(p -> p.canHandle(uri, httpMethod)).findFirst().orElse(null); - } - - private boolean hasSuccessfulResponse(final Set<NodeResponse> allResponses) { - return allResponses.stream().anyMatch(p -> p.is2xx()); - } - - private void drainResponses(final Set<NodeResponse> responses, final NodeResponse exclude) { - responses.stream() - .parallel() // parallelize the draining of the responses, since we have multiple streams to consume - .filter(response -> response != exclude) // don't include the explicitly excluded node - .filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content - .forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out - } - - private void drainResponse(final NodeResponse response) { - if (response.hasThrowable()) { - return; - } - - try { - ((StreamingOutput) response.getResponse().getEntity()).write(new NullOutputStream()); - } catch (final IOException ioe) { - logger.info("Failed clearing out non-client response buffer from " + response.getNodeId() + " due to: " + ioe, ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 3bcc8e7..926151e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -17,6 +17,12 @@ package org.apache.nifi.cluster.coordination.http.replication; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URI; import java.util.Collections; import java.util.HashMap; @@ -27,12 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class StandardAsyncClusterResponse implements AsyncClusterResponse { private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class); @@ -40,10 +40,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { private final Set<NodeIdentifier> nodeIds; private final URI uri; private final String method; - private final HttpResponseMerger responseMerger; + private final HttpResponseMapper responseMapper; private final CompletionCallback completionCallback; private final Runnable completedResultFetchedCallback; private final long creationTimeNanos; + private final boolean merge; private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>(); private final AtomicInteger requestsCompleted = new AtomicInteger(0); @@ -52,18 +53,19 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { private RuntimeException failure; // guarded by synchronizing on this public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds, - final HttpResponseMerger responseMerger, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback) { + final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) { this.id = id; this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds)); this.uri = uri; this.method = method; + this.merge = merge; creationTimeNanos = System.nanoTime(); for (final NodeIdentifier nodeId : nodeIds) { responseMap.put(nodeId, new ResponseHolder(creationTimeNanos)); } - this.responseMerger = responseMerger; + this.responseMapper = responseMapper; this.completionCallback = completionCallback; this.completedResultFetchedCallback = completedResultFetchedCallback; } @@ -142,7 +144,7 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { .map(p -> p.getResponse()) .filter(response -> response != null) .collect(Collectors.toSet()); - mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses); + mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge); logger.debug("Notifying all that merged response is complete for {}", id); this.notifyAll(); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c1ee77b..258588d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -27,8 +27,8 @@ import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.manager.NodeResponse; @@ -85,7 +85,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final Client client; // the client to use for issuing requests private final int connectionTimeoutMs; // connection timeout per node request private final int readTimeoutMs; // read timeout per node request - private final HttpResponseMerger responseMerger; + private final HttpResponseMapper responseMapper; private final EventReporter eventReporter; private final RequestCompletionCallback callback; private final ClusterCoordinator clusterCoordinator; @@ -140,7 +140,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { this.clusterCoordinator = clusterCoordinator; this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); - this.responseMerger = new StandardHttpResponseMerger(nifiProperties); + this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; @@ -249,12 +249,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); } } @@ -269,7 +269,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); } - return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false); + return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false); } /** @@ -286,7 +286,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification, - StandardAsyncClusterResponse response, boolean executionPhase) { + StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) { // state validation Objects.requireNonNull(nodeIds); @@ -344,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // create a response object if one was not already passed to us if (response == null) { response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMerger, completionCallback, responseConsumedCallback); + responseMapper, completionCallback, responseConsumedCallback, merge); responseMap.put(requestId, response); } @@ -358,7 +358,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final boolean mutableRequest = isMutableRequest(method, uri.getPath()); if (mutableRequest && performVerification) { logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); - performVerification(nodeIds, method, uri, entity, updatedHeaders, response); + performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge); return response; } @@ -383,7 +383,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } - private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) { + private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); final Map<String, String> validationHeaders = new HashMap<>(headers); @@ -418,7 +418,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // to all nodes and we are finished. if (dissentingCount == 0) { logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath()); - replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true); + replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge); return; } @@ -743,7 +743,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // create the resource WebResource resource = client.resource(uri); - if (responseMerger.isResponseInterpreted(uri, method)) { + if (responseMapper.isResponseInterpreted(uri, method)) { resource.addFilter(new GZIPContentEncodingFilter(false)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index e50d8fa..a17e0de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -16,26 +16,11 @@ */ package org.apache.nifi.cluster.coordination.node; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.NodeEvent; @@ -69,6 +54,22 @@ import org.apache.nifi.web.revision.RevisionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback { private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class); @@ -900,7 +901,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl * state even if they had problems handling the request. */ if (mutableRequest) { - final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(nifiProperties); + final HttpResponseMapper responseMerger = new StandardHttpResponseMapper(nifiProperties); final Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses); // all nodes failed http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java index e7ab881..3c18ced 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java @@ -33,7 +33,7 @@ public class PropertyDescriptorDtoMerger { for (final Map.Entry<NodeIdentifier, PropertyDescriptorDTO> nodeEntry : dtoMap.entrySet()) { final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue(); final List<AllowableValueEntity> nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues(); - if (clientPropertyDescriptor != nodePropertyDescriptor && nodePropertyDescriptorAllowableValues != null) { + if (nodePropertyDescriptorAllowableValues != null) { nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> { allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>()) .add(allowableValueEntity); http://git-wip-us.apache.org/repos/asf/nifi/blob/0b1d15a7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy new file mode 100644 index 0000000..243fd1a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cluster.coordination.http + +import com.sun.jersey.api.client.ClientResponse +import org.apache.nifi.cluster.manager.NodeResponse +import org.apache.nifi.cluster.protocol.NodeIdentifier +import org.apache.nifi.util.NiFiProperties +import org.apache.nifi.web.api.dto.ConnectionDTO +import org.apache.nifi.web.api.dto.ControllerConfigurationDTO +import org.apache.nifi.web.api.dto.FunnelDTO +import org.apache.nifi.web.api.dto.LabelDTO +import org.apache.nifi.web.api.dto.PermissionsDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO +import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO +import org.apache.nifi.web.api.entity.ConnectionEntity +import org.apache.nifi.web.api.entity.ConnectionsEntity +import org.apache.nifi.web.api.entity.ControllerConfigurationEntity +import org.apache.nifi.web.api.entity.FunnelEntity +import org.apache.nifi.web.api.entity.FunnelsEntity +import org.apache.nifi.web.api.entity.LabelEntity +import org.apache.nifi.web.api.entity.LabelsEntity +import org.codehaus.jackson.map.ObjectMapper +import org.codehaus.jackson.map.SerializationConfig +import org.codehaus.jackson.map.annotate.JsonSerialize +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector +import spock.lang.Specification +import spock.lang.Unroll + +@Unroll +class StandardHttpResponseMapperSpec extends Specification { + + def setup() { + def propFile = StandardHttpResponseMapperSpec.class.getResource("/conf/nifi.properties").getFile() + System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile + } + + def cleanup() { + System.clearProperty NiFiProperties.PROPERTIES_FILE_PATH + } + + def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() { + given: + def responseMapper = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null)) + def requestUri = new URI('http://server/resource') + def requestId = UUID.randomUUID().toString() + def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:] + def nodeResponseSet = nodeResponseData.collect { + int n = it.node + def clientResponse = Mock(ClientResponse) + mockToRequestEntity.put clientResponse, it + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId) + } as Set + + when: + def returnedResponse = responseMapper.mapResponses(requestUri, 'get', nodeResponseSet, true).getStatus() + + then: + mockToRequestEntity.entrySet().forEach { + ClientResponse mockClientResponse = it.key + _ * mockClientResponse.getStatus() >> it.value.status + } + 0 * _ + returnedResponse == expectedStatus + + where: + nodeResponseData || expectedStatus + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 401]] as Set || 401 + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 403]] as Set || 403 + [[node: 1, status: 200], [node: 2, status: 403], [node: 3, status: 500]] as Set || 403 + [[node: 1, status: 200], [node: 2, status: 200], [node: 3, status: 500]] as Set || 500 + } + + def "MergeResponses: #responseEntities.size() HTTP 200 #httpMethod responses for #requestUriPart"() { + given: "json serialization setup" + def mapper = new ObjectMapper(); + def jaxbIntrospector = new JaxbAnnotationIntrospector(); + def SerializationConfig serializationConfig = mapper.getSerializationConfig(); + mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); + + and: "setup of the data to be used in the test" + def responseMerger = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null)) + def requestUri = new URI("http://server/$requestUriPart") + def requestId = UUID.randomUUID().toString() + def Map<ClientResponse, Object> mockToRequestEntity = [:] + def n = 0 + def nodeResponseSet = responseEntities.collect { + ++n + def clientResponse = Mock(ClientResponse) + mockToRequestEntity.put clientResponse, it + new NodeResponse(new NodeIdentifier("cluster-node-$n", 'addr', n, 'sktaddr', n * 10, 'stsaddr', n * 100, n * 1000, false, null), "get", requestUri, clientResponse, 500L, requestId) + } as Set + + when: + def returnedResponse = responseMerger.mapResponses(requestUri, httpMethod, nodeResponseSet, true) + + then: + mockToRequestEntity.entrySet().forEach { + ClientResponse mockClientResponse = it.key + def entity = it.value + _ * mockClientResponse.getStatus() >> 200 + 1 * mockClientResponse.getEntity(_) >> entity + } + responseEntities.size() == mockToRequestEntity.size() + 0 * _ + def returnedJson = mapper.writeValueAsString(returnedResponse.getUpdatedEntity()) + def expectedJson = mapper.writeValueAsString(expectedEntity) + returnedJson == expectedJson + + where: + requestUriPart | httpMethod | responseEntities || + expectedEntity + 'nifi-api/controller/config' | 'get' | [ + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] || + // expectedEntity + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)) + 'nifi-api/controller/config' | 'put' | [ + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)), + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] || + // expectedEntity + new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false), + component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)) + "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get' | [ + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set), + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 100)))] as Set), + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new + ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 500)), component: new ConnectionDTO())] as Set)] || + // expectedEntity + new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, + input: '0 (900 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0)))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'post' | [ + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO()), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] || + // expectedEntity + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 900, input: '0 (900 bytes)', + output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0))) + "nifi-api/connections/${UUID.randomUUID()}" | 'get' | [ + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 400)), component: new ConnectionDTO()), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300))), + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: + new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] || + // expectedEntity + new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false), + status: new ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 1000, + input: '0 (1,000 bytes)', output: '0 (0 bytes)', queued: '0 (0 bytes)', queuedSize: '0 bytes', queuedCount: 0))) + "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'get' | [ + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set), + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set), + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] as Set)] || + // expectedEntity + new LabelsEntity(labels: [new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/labels" | 'post' | [ + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/labels/${UUID.randomUUID()}" | 'get' | [ + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO()), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new LabelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'get' | [ + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set), + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set), + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] as Set)] || + // expectedEntity + new FunnelsEntity(funnels: [new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false))] as Set) + "nifi-api/process-groups/${UUID.randomUUID()}/funnels" | 'post' | [ + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] || + // expectedEntity + new LabelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + "nifi-api/funnels/${UUID.randomUUID()}" | 'get' | [ + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO()), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)), + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), component: new FunnelDTO())] || + // expectedEntity + new FunnelEntity(id: '1', permissions: new PermissionsDTO(canRead: false, canWrite: false)) + } +}