METRON-1427: Add support for storm 1.1 and hdp 2.6 (cstella via mmiklavc) closes apache/metron#907
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/644e951c Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/644e951c Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/644e951c Branch: refs/heads/feature/METRON-1344-test-infrastructure Commit: 644e951c8b2c33b5e52602a814d91012c3b325b1 Parents: 0874571 Author: cstella <ceste...@gmail.com> Authored: Tue Jan 30 10:54:57 2018 -0700 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Tue Jan 30 10:54:57 2018 -0700 ---------------------------------------------------------------------- metron-analytics/metron-profiler/pom.xml | 4 +- .../roles/ambari_common/defaults/main.yml | 4 +- .../roles/ambari_config/defaults/main.yml | 2 +- .../roles/ambari_config/vars/single_node_vm.yml | 7 ++ .../roles/ambari_config/vars/small_cluster.yml | 7 ++ .../roles/ambari_gather_facts/defaults/main.yml | 19 ++++++ .../roles/ambari_gather_facts/tasks/main.yml | 67 ++++++++++++-------- .../metron-mpack/src/main/resources/mpack.json | 14 ++++ .../apache/metron/rest/config/KafkaConfig.java | 8 ++- .../rest/service/impl/StormCLIWrapper.java | 3 +- .../apache/metron/common/utils/KafkaUtils.java | 27 ++++++++ metron-platform/metron-elasticsearch/pom.xml | 4 +- .../parsers/topology/ParserTopologyBuilder.java | 3 +- metron-platform/metron-solr/pom.xml | 12 +++- .../kafka/flux/SimpleStormKafkaBuilder.java | 2 + .../apache/metron/writer/kafka/KafkaWriter.java | 1 + 16 files changed, 144 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-analytics/metron-profiler/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/pom.xml b/metron-analytics/metron-profiler/pom.xml index d634cef..4d36782 100644 --- a/metron-analytics/metron-profiler/pom.xml +++ b/metron-analytics/metron-profiler/pom.xml @@ -341,8 +341,8 @@ <shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern> </relocation> <relocation> - <pattern>com.fasterxml.jackson.core</pattern> - <shadedPattern>com.fasterxml.jackson.core.metron.elasticsearch</shadedPattern> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.metron.jackson</shadedPattern> </relocation> </relocations> <artifactSet> http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_common/defaults/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_common/defaults/main.yml b/metron-deployment/ansible/roles/ambari_common/defaults/main.yml index c04400e..0614e0f 100644 --- a/metron-deployment/ansible/roles/ambari_common/defaults/main.yml +++ b/metron-deployment/ansible/roles/ambari_common/defaults/main.yml @@ -17,7 +17,7 @@ --- hadoop_logrotate_frequency: daily hadoop_logrotate_retention: 30 -centos_ambari_install_url: http://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.4.2.0/ambari.repo -ubuntu_ambari_repo: http://public-repo-1.hortonworks.com/ambari/ubuntu14/2.x/updates/2.4.2.0 +centos_ambari_install_url: http://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.5.2.0/ambari.repo +ubuntu_ambari_repo: http://public-repo-1.hortonworks.com/ambari/ubuntu14/2.x/updates/2.5.2.0 ubuntu_elasticsearch_packages_repo: https://artifacts.elastic.co/packages/5.x/apt ubuntu_elasticsearch_curator_repo: https://packages.elastic.co/curator/5/debian http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/defaults/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_config/defaults/main.yml b/metron-deployment/ansible/roles/ambari_config/defaults/main.yml index e0de145..ad7ca9e 100644 --- a/metron-deployment/ansible/roles/ambari_config/defaults/main.yml +++ b/metron-deployment/ansible/roles/ambari_config/defaults/main.yml @@ -34,5 +34,5 @@ mapred_reduce_java_opts : -Xmx1024m mapred_map_mem_mb : 1229 mapred_reduce_mem_mb : 1229 topology_classpath: '/etc/hbase/conf:/etc/hadoop/conf' -hdp_stack: "2.5" +hdp_stack: "2.6" elasticsearch_network_interface: _site_ http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml index 6a60902..bf54fe0 100644 --- a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml +++ b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml @@ -87,6 +87,13 @@ configurations: supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]" storm.local.dir: '{{ storm_local_dir }}' topology.classpath: '{{ topology_classpath }}' + # Storm expects ambari metrics to be available in 2.6. We do *not* install ambari metrics in full-dev, so we need to revert to the old consumer + storm.cluster.metrics.consumer.register: '[{"class": "org.apache.storm.metric.LoggingMetricsConsumer"}]' + topology.metrics.consumer.register: '[{"class": "org.apache.storm.metric.LoggingMetricsConsumer", "parallelism.hint": 1, "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]' + # Storm expects ambari metrics to be available in 2.6 and ambari metrics pulls data via JMX, but since we don't use ambari metrics here, we don't have the javaagent around to use and thus that must be removed from nimbus, supervisor and worker properties + nimbus.childopts: '-Xmx1024m _JAAS_PLACEHOLDER' + supervisor.childopts: '-Xmx256m _JAAS_PLACEHOLDER' + worker.childopts: "-Xmx768m _JAAS_PLACEHOLDER" - kafka-env: content: "{% raw %}\n#!/bin/bash\n\n# Set KAFKA specific environment variables here.\n\n# The java implementation to use.\nexport KAFKA_HEAP_OPTS=\"-Xms256M -Xmx256M\"\nexport KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:+DisableExplicitGC -Djava.awt.headless=true\"\nexport JAVA_HOME={{java64_home}}\nexport PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport LOG_DIR={{kafka_log_dir}}\nexport KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n# Add kafka sink to classpath and related depenencies\nif [ -e \"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n . /etc/kafka/conf/kafka-ranger-env.sh\nfi{% endraw %}" - kafka-broker: http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml index 4ec8458..218e267 100644 --- a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml +++ b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml @@ -85,6 +85,13 @@ configurations: supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]" storm.local.dir: '{{ storm_local_dir | default("/hadoop/storm") }}' topology.classpath: '{{ topology_classpath }}' + # Storm expects ambari metrics to be available in 2.6. We do *not* install ambari metrics in full-dev, so we need to revert to the old consumer + storm.cluster.metrics.consumer.register: '[{"class": "org.apache.storm.metric.LoggingMetricsConsumer"}]' + topology.metrics.consumer.register: '[{"class": "org.apache.storm.metric.LoggingMetricsConsumer", "parallelism.hint": 1, "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]' + # Storm expects ambari metrics to be available in 2.6 and ambari metrics pulls data via JMX, but since we don't use ambari metrics here, we don't have the javaagent around to use and thus that must be removed from nimbus, supervisor and worker properties + nimbus.childopts: '-Xmx1024m _JAAS_PLACEHOLDER' + supervisor.childopts: '-Xmx256m _JAAS_PLACEHOLDER' + worker.childopts: "-Xmx768m _JAAS_PLACEHOLDER" - kafka-broker: log.dirs: '{{ kafka_log_dirs | default("/kafka-log") }}' - metron-rest-env: http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml b/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml new file mode 100644 index 0000000..5351a60 --- /dev/null +++ b/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +--- +curl: "curl -s -u {{ ambari_user }}:{{ ambari_password }} -X GET -H \"X-Requested-By: ambari\"" +parse_json: "import sys, json; print json.load(sys.stdin)" http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml ---------------------------------------------------------------------- diff --git a/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml b/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml index 2b37eec..25f0982 100644 --- a/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml +++ b/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml @@ -32,55 +32,55 @@ cluster_name: "{{ (cluster_name_response.content | from_json)['items'][0].Clusters.cluster_name }}" when: cluster_name is undefined +- set_fact: + base_url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}" + # # namenode_host # - name: "Ask Ambari: namenode_host" - uri: - url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/services/HDFS/components/NAMENODE" - user: "{{ ambari_user }}" - password: "{{ ambari_password }}" - force_basic_auth: yes - return_content: yes - register: namenode_hosts_response + shell: > + {{ curl }} '{{ base_url }}/services/HDFS/components/NAMENODE' \ + | python -c '{{ parse_json }}["host_components"][0]["HostRoles"]["host_name"]' + args: + warn: false + register: namenode_host_response when: namenode_host is undefined - set_fact: - namenode_host: "{{ (namenode_hosts_response.content | from_json).host_components[0].HostRoles.host_name }}" + namenode_host: "{{ namenode_host_response.stdout_lines[0] }}" when: namenode_host is undefined # # core_site_tag # - name: "Ask Ambari: core_site_tag" - uri: - url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/hosts/{{ namenode_host }}/host_components/NAMENODE" - user: "{{ ambari_user }}" - password: "{{ ambari_password }}" - force_basic_auth: yes - return_content: yes + shell: > + {{ curl }} '{{ base_url }}/hosts/{{ namenode_host }}/host_components/NAMENODE' \ + | python -c '{{ parse_json }}["HostRoles"]["actual_configs"]["core-site"]["default"]' + args: + warn: false register: core_site_tag_response when: core_site_tag is undefined - set_fact: - core_site_tag: "{{ (core_site_tag_response.content | from_json).HostRoles.actual_configs['core-site'].default }}" + core_site_tag: "{{ core_site_tag_response.stdout_lines[0] }}" when: core_site_tag is undefined # # hdfs_url # - name: "Ask Ambari: hdfs_url" - uri: - url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/configurations?type=core-site&tag={{ core_site_tag }}" - user: "{{ ambari_user }}" - password: "{{ ambari_password }}" - force_basic_auth: yes - return_content: yes - register: core_site_response + shell: > + {{ curl }} '{{ base_url }}/configurations?type=core-site&tag={{ core_site_tag }}' \ + | python -c '{{ parse_json }}["items"][0]["properties"]["fs.defaultFS"]' + args: + warn: false + register: hdfs_url_response when: hdfs_url is undefined - set_fact: - hdfs_url: "{{ (core_site_response.content | from_json)['items'][0].properties['fs.defaultFS'] }}" + hdfs_url: "{{ hdfs_url_response.stdout_lines[0] }}" when: hdfs_url is undefined # @@ -88,7 +88,7 @@ # - name: "Ask Ambari: kafka_broker_hosts" uri: - url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/services/KAFKA/components/KAFKA_BROKER" + url: "{{ base_url }}/services/KAFKA/components/KAFKA_BROKER" user: "{{ ambari_user }}" password: "{{ ambari_password }}" force_basic_auth: yes @@ -105,7 +105,7 @@ # - name: "Ask Ambari: kafka_broker_tag" uri: - url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/hosts/{{ kafka_broker_hosts[0] }}/host_components/KAFKA_BROKER" + url: "{{ base_url }}/hosts/{{ kafka_broker_hosts[0] }}/host_components/KAFKA_BROKER" user: "{{ ambari_user }}" password: "{{ ambari_password }}" force_basic_auth: yes @@ -122,7 +122,8 @@ # - name: "Ask Ambari: kafka_broker_port" shell: > - curl -s -u {{ ambari_user }}:{{ ambari_password }} -X GET -H "X-Requested-By: ambari" "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/configurations?type=kafka-broker&tag={{ kafka_broker_tag }}" | python -c 'import sys, json; print json.load(sys.stdin)["items"][0]["properties"]["listeners"]' + {{ curl }} '{{ base_url }}/configurations?type=kafka-broker&tag={{ kafka_broker_tag }}' \ + | python -c '{{ parse_json }}["items"][0]["properties"]["listeners"]' args: warn: false register: kafka_broker_port_response @@ -191,6 +192,9 @@ zookeeper_url: "{% for host in zookeeper_hosts %}{% if loop.index != 1 %},{% endif %}{{ host }}:{{ zookeeper_port }}{% endfor %}" when: zookeeper_url is undefined +# +# metron_hosts +# - name: "Ask Ambari: metron_hosts" uri: url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/services/METRON/components/METRON_INDEXING" @@ -205,6 +209,9 @@ metron_hosts: "{{ (metron_hosts_response.content | from_json).host_components | map(attribute='HostRoles.host_name') | list }}" when: metron_hosts is undefined +# +# kibana hosts +# - name: "Ask Ambari: kibana_hosts" uri: url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port }}/api/v1/clusters/{{ cluster_name }}/services/KIBANA/components/KIBANA_MASTER" @@ -225,10 +232,14 @@ # - name: debug debug: - msg: "zookeeper_port = {{ zookeeper_port }}, + msg: "cluster_name = {{ cluster_name }}, + namenode_host = {{ namenode_host }}, + hdfs_url = {{ hdfs_url }}, + zookeeper_port = {{ zookeeper_port }}, zookeeper_hosts = {{ zookeeper_hosts }}, zookeeper_url = {{ zookeeper_url }}, kafka_broker_port = {{ kafka_broker_port }}, kafka_broker_hosts = {{ kafka_broker_hosts }}, kafka_broker_url = {{ kafka_broker_url }}, - metron_hosts = {{ metron_hosts }}" + metron_hosts = {{ metron_hosts }}, + kibana_hosts = {{ kibana_hosts }}" http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json index 7a9d892..3946881 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json @@ -38,7 +38,12 @@ { "stack_name" : "HDP", "stack_version" : "2.5" + }, + { + "stack_name" : "HDP", + "stack_version" : "2.6" } + ] }, { @@ -56,8 +61,13 @@ { "stack_name" : "HDP", "stack_version" : "2.5" + }, + { + "stack_name" : "HDP", + "stack_version" : "2.6" } + ] }, { @@ -75,6 +85,10 @@ { "stack_name" : "HDP", "stack_version" : "2.5" + }, + { + "stack_name" : "HDP", + "stack_version" : "2.6" } ] http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java index a15c48f..7e9b468 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java @@ -22,6 +22,8 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.rest.MetronRestConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -86,7 +88,7 @@ public class KafkaConfig { props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { - props.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); + props.put("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY))); } return props; } @@ -109,11 +111,13 @@ public class KafkaConfig { producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfig.put("request.required.acks", 1); if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { - producerConfig.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); + producerConfig.put("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY))); } return producerConfig; } + + @Bean public KafkaProducer kafkaProducer() { return new KafkaProducer<>(producerProperties()); http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java index 463c925..fff7390 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java @@ -18,6 +18,7 @@ package org.apache.metron.rest.service.impl; import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.slf4j.Logger; @@ -117,7 +118,7 @@ public class StormCLIWrapper { // kafka security protocol command.add( "-ksp"); - command.add( environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); + command.add(KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY))); // extra topology options boolean kerberosEnabled = environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false); http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java index d54e2b8..796bc42 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java @@ -25,6 +25,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kafka.common.protocol.SecurityProtocol; import java.util.ArrayList; import java.util.List; @@ -32,6 +33,7 @@ import java.util.Map; public enum KafkaUtils { INSTANCE; + public static final String SECURITY_PROTOCOL = "security.protocol"; public List<String> getBrokersFromZookeeper(String zkQuorum) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework framework = CuratorFrameworkFactory.newClient(zkQuorum, retryPolicy); @@ -66,6 +68,31 @@ public enum KafkaUtils { return ret; } + public Map<String, Object> normalizeProtocol(Map<String, Object> configs) { + if(configs.containsKey(SECURITY_PROTOCOL)) { + String protocol = normalizeProtocol((String)configs.get(SECURITY_PROTOCOL)); + configs.put(SECURITY_PROTOCOL, protocol); + } + return configs; + } + + public String normalizeProtocol(String protocol) { + if(protocol.equalsIgnoreCase("PLAINTEXTSASL") || protocol.equalsIgnoreCase("SASL_PLAINTEXT")) { + if(SecurityProtocol.getNames().contains("PLAINTEXTSASL")) { + return "PLAINTEXTSASL"; + } + else if(SecurityProtocol.getNames().contains("SASL_PLAINTEXT")) { + return "SASL_PLAINTEXT"; + } + else { + throw new IllegalStateException("Unable to find the appropriate SASL protocol, " + + "viable options are: " + Joiner.on(",").join(SecurityProtocol.getNames())); + } + } + else { + return protocol.trim(); + } + } /* The URL accepted is NOT a general URL, and is assumed to follow the format used by the Kafka structures in Zookeeper. See: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 97f4062..141d8aa 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -264,8 +264,8 @@ <shadedPattern>org.apache.metron.guava.metron-elasticsearch</shadedPattern> </relocation> <relocation> - <pattern>com.fasterxml.jackson.core</pattern> - <shadedPattern>com.fasterxml.jackson.core.metron.elasticsearch</shadedPattern> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.metron.jackson</shadedPattern> </relocation> </relocations> <artifactSet> http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index c918703..1039e56 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -18,6 +18,7 @@ package org.apache.metron.parsers.topology; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.metron.common.utils.KafkaUtils; import org.apache.metron.parsers.topology.config.ValueSupplier; import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.metron.storm.kafka.flux.SpoutConfiguration; @@ -163,7 +164,7 @@ public class ParserTopologyBuilder { , inputTopic + "_parser" ); if(securityProtocol.isPresent()) { - kafkaSpoutConfigOptions.putIfAbsent("security.protocol", securityProtocol.get()); + kafkaSpoutConfigOptions.putIfAbsent("security.protocol", KafkaUtils.INSTANCE.normalizeProtocol(securityProtocol.get())); } return SimpleStormKafkaBuilder.create( inputTopic , zkQuorum http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-solr/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml index a2eee71..9c9c7fb 100644 --- a/metron-platform/metron-solr/pom.xml +++ b/metron-platform/metron-solr/pom.xml @@ -261,7 +261,17 @@ <exclude>META-INF/*.RSA</exclude> </excludes> </filter> - </filters> + </filters> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.metron.guava</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.metron.jackson</shadedPattern> + </relocation> + </relocations> <artifactSet> <excludes> <exclude>storm:storm-core:*</exclude> http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index 1bcee9a..f99e549 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -203,6 +203,8 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) , subscription ); + + kafkaProps = KafkaUtils.INSTANCE.normalizeProtocol(kafkaProps); setProp(kafkaProps); setRecordTranslator(new SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration))); } http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java index 7ce9b9b..f73e0f4 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java @@ -158,6 +158,7 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj producerConfig.put("value.serializer", valueSerializer); producerConfig.put("request.required.acks", requiredAcks); producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs); + producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig); return producerConfig; }