This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch 2.7.2_ds_rootless in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53120ffa6799fc4b9883572b8f8d286856db0244 Author: Enrico Olivelli <eolive...@apache.org> AuthorDate: Thu May 13 14:26:10 2021 +0200 Remove Solr --- pulsar-io/pom.xml | 1 - pulsar-io/solr/pom.xml | 86 ----------- .../apache/pulsar/io/solr/SolrAbstractSink.java | 131 ---------------- .../pulsar/io/solr/SolrGenericRecordSink.java | 53 ------- .../org/apache/pulsar/io/solr/SolrSinkConfig.java | 98 ------------ .../resources/META-INF/services/pulsar-io.yaml | 23 --- .../pulsar/io/solr/SolrGenericRecordSinkTest.java | 114 -------------- .../org/apache/pulsar/io/solr/SolrServerUtil.java | 91 ----------- .../apache/pulsar/io/solr/SolrSinkConfigTest.java | 168 --------------------- pulsar-io/solr/src/test/resources/sinkConfig.yaml | 27 ---- pulsar-io/solr/src/test/resources/solr.xml | 38 ----- 11 files changed, 830 deletions(-) diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 40b476a..a3911c0 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -64,7 +64,6 @@ <module>mongo</module> <module>flume</module> <module>redis</module> - <module>solr</module> <module>influxdb</module> <module>dynamodb</module> <module>nsq</module> diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml deleted file mode 100644 index a8d6103..0000000 --- a/pulsar-io/solr/pom.xml +++ /dev/null @@ -1,86 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>pulsar-io</artifactId> - <groupId>org.apache.pulsar</groupId> - <version>2.7.2.1.0.0</version> - </parent> - - <properties> - <solr.version>8.6.3</solr.version> - </properties> - - <artifactId>pulsar-io-solr</artifactId> - <name>Pulsar IO :: Solr</name> - - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>pulsar-io-core</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-functions-instance</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-client-original</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.solr</groupId> - <artifactId>solr-solrj</artifactId> - <version>${solr.version}</version> - </dependency> - <dependency> - <groupId>org.apache.solr</groupId> - <artifactId>solr-core</artifactId> - <version>${solr.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.4</version> - </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>3.2.2</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> \ No newline at end of file diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java deleted file mode 100644 index d7bcb12..0000000 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import com.google.common.base.Strings; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.SinkContext; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.UpdateResponse; -import org.apache.solr.common.SolrInputDocument; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -/** - * A simple abstract class for Solr sink - */ -@Slf4j -public abstract class SolrAbstractSink<T> implements Sink<T> { - - private SolrSinkConfig solrSinkConfig; - private SolrClient client; - private boolean enableBasicAuth; - - @Override - public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { - solrSinkConfig = SolrSinkConfig.load(config); - solrSinkConfig.validate(); - - enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername()); - - SolrMode solrMode; - try { - solrMode = SolrMode.valueOf(solrSinkConfig.getSolrMode().toUpperCase()); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Illegal Solr mode, valid values are: " - + Arrays.asList(SolrMode.values())); - } - - client = getClient(solrMode, solrSinkConfig.getSolrUrl()); - } - - @Override - public void write(Record<T> record) { - UpdateRequest updateRequest = new UpdateRequest(); - if (solrSinkConfig.getSolrCommitWithinMs() > 0) { - updateRequest.setCommitWithin(solrSinkConfig.getSolrCommitWithinMs()); - } - if (enableBasicAuth) { - updateRequest.setBasicAuthCredentials( - solrSinkConfig.getUsername(), - solrSinkConfig.getPassword() - ); - } - - SolrInputDocument document = convert(record); - updateRequest.add(document); - - try { - UpdateResponse updateResponse = updateRequest.process(client, solrSinkConfig.getSolrCollection()); - if (updateResponse.getStatus() == 0) { - record.ack(); - } else { - record.fail(); - } - } catch (SolrServerException | IOException e) { - record.fail(); - log.warn("Solr update document exception ", e); - } - } - - @Override - public void close() throws Exception { - if (client != null) { - client.close(); - } - } - - // convert record as a Solr document - public abstract SolrInputDocument convert(Record<T> message); - - public static SolrClient getClient(SolrMode solrMode, String url) { - SolrClient solrClient = null; - if (solrMode.equals(SolrMode.STANDALONE)) { - HttpSolrClient.Builder builder = new HttpSolrClient.Builder(url); - solrClient = builder.build(); - } - if (solrMode.equals(SolrMode.SOLRCLOUD)) { - int chrootIndex = url.indexOf("/"); - Optional<String> chroot = Optional.empty(); - if (chrootIndex > 0) { - chroot = Optional.of(url.substring(chrootIndex)); - } - String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url; - List<String> zkHosts = Arrays.asList(zkUrls.split(",")); - CloudSolrClient.Builder builder = new CloudSolrClient.Builder(zkHosts, chroot); - solrClient = builder.build(); - } - return solrClient; - } - - public enum SolrMode { - STANDALONE, - SOLRCLOUD - } -} diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java deleted file mode 100644 index e1df871..0000000 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrGenericRecordSink.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.schema.Field; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.annotations.Connector; -import org.apache.pulsar.io.core.annotations.IOType; -import org.apache.solr.common.SolrInputDocument; - -import java.util.List; - -/** - * A simple Solr sink, which interprets input Record in generic record. - */ -@Connector( - name = "solr", - type = IOType.SINK, - help = "The SolrGenericRecordSink is used for moving messages from Pulsar to Solr.", - configClass = SolrSinkConfig.class -) -@Slf4j -public class SolrGenericRecordSink extends SolrAbstractSink<GenericRecord> { - @Override - public SolrInputDocument convert(Record<GenericRecord> message) { - SolrInputDocument doc = new SolrInputDocument(); - GenericRecord record = message.getValue(); - List<Field> fields = record.getFields(); - for (Field field : fields) { - Object fieldValue = record.getField(field); - doc.setField(field.getName(), fieldValue); - } - return doc; - } -} diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java deleted file mode 100644 index b761b12..0000000 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.common.base.Preconditions; -import lombok.Data; -import lombok.experimental.Accessors; -import org.apache.pulsar.io.core.annotations.FieldDoc; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; - -/** - * Configuration class for the Solr Sink Connector. - */ -@Data -@Accessors(chain = true) -public class SolrSinkConfig implements Serializable { - - private static final long serialVersionUID = -4849066206354610110L; - - @FieldDoc( - required = true, - defaultValue = "", - help = "Comma separated zookeeper hosts with chroot used in SolrCloud mode (eg: localhost:2181,localhost:2182/chroot)" - + " or Url to connect to solr used in Standalone mode (e.g. localhost:8983/solr)" - ) - private String solrUrl; - - @FieldDoc( - required = true, - defaultValue = "SolrCloud", - help = "The client mode to use when interacting with the Solr cluster. Possible values [Standalone, SolrCloud]") - private String solrMode = "SolrCloud"; - - @FieldDoc( - required = true, - defaultValue = "", - help = "Solr collection name to which records need to be written") - private String solrCollection; - - @FieldDoc( - required = false, - defaultValue = "10", - help = "Commit within milli seconds for solr update, if none passes defaults to 10 ms") - private int solrCommitWithinMs = 10; - - @FieldDoc( - required = false, - defaultValue = "", - sensitive = true, - help = "The username to use for basic authentication") - private String username; - - @FieldDoc( - required = false, - defaultValue = "", - sensitive = true, - help = "The password to use for basic authentication") - private String password; - - public static SolrSinkConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), SolrSinkConfig.class); - } - - public static SolrSinkConfig load(Map<String, Object> map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(new ObjectMapper().writeValueAsString(map), SolrSinkConfig.class); - } - - public void validate() { - Preconditions.checkNotNull(solrUrl, "solrUrl property not set."); - Preconditions.checkNotNull(solrMode, "solrMode property not set."); - Preconditions.checkNotNull(solrCollection, "solrCollection property not set."); - Preconditions.checkArgument(solrCommitWithinMs > 0, "solrCommitWithinMs must be a positive integer."); - } -} diff --git a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml deleted file mode 100644 index 26347f5..0000000 --- a/pulsar-io/solr/src/main/resources/META-INF/services/pulsar-io.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -name: solr -description: Writes data into solr collection -sinkClass: org.apache.pulsar.io.solr.SolrGenericRecordSink -sinkConfigClass: org.apache.pulsar.io.solr.SolrSinkConfig diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java deleted file mode 100644 index 1bd0d9d..0000000 --- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.api.schema.GenericSchema; -import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; -import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.functions.source.PulsarRecord; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * solr Sink test - */ -@Slf4j -public class SolrGenericRecordSinkTest { - - private SolrServerUtil solrServerUtil; - private Message<GenericRecord> message; - - /** - * A Simple class to test solr class - */ - @Data - public static class Foo { - private String field1; - private String field2; - } - - @BeforeMethod - public void setUp() throws Exception { - solrServerUtil = new SolrServerUtil(8983); - solrServerUtil.startStandaloneSolr(); - } - - @AfterMethod - public void tearDown() throws Exception { - solrServerUtil.stopStandaloneSolr(); - } - - @Test - public void TestOpenAndWriteSink() throws Exception { - message = mock(MessageImpl.class); - Map<String, Object> configs = new HashMap<>(); - configs.put("solrUrl", "http://localhost:8983/solr"); - configs.put("solrMode", "Standalone"); - configs.put("solrCollection", "techproducts"); - configs.put("solrCommitWithinMs", "100"); - configs.put("username", ""); - configs.put("password", ""); - GenericSchema<GenericRecord> genericAvroSchema; - - SolrGenericRecordSink sink = new SolrGenericRecordSink(); - - // prepare a foo Record - Foo obj = new Foo(); - obj.setField1("FakeFiled1"); - obj.setField2("FakeFiled1"); - AvroSchema<Foo> schema = AvroSchema.of(Foo.class); - - byte[] bytes = schema.encode(obj); - AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); - autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo())); - - Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder() - .message(message) - .topicName("fake_topic_name") - .build(); - - genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo()); - - when(message.getValue()) - .thenReturn(genericAvroSchema.decode(bytes)); - - log.info("foo:{}, Message.getValue: {}, record.getValue: {}", - obj.toString(), - message.getValue().toString(), - record.getValue().toString()); - - // open should success - sink.open(configs, null); - } -} diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java deleted file mode 100644 index 033d0d2..0000000 --- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrServerUtil.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; -import org.apache.solr.client.solrj.embedded.JettySolrRunner; - -import java.io.File; - -@Slf4j -public class SolrServerUtil { - private JettySolrRunner standaloneSolr; - private int port; - - public SolrServerUtil(int port) { - this.port = port; - } - - public void startStandaloneSolr() throws Exception { - if (standaloneSolr != null) { - throw new IllegalStateException("Test is already running a standalone Solr instance " + - standaloneSolr.getBaseUrl() + "! This indicates a bug in the unit test logic."); - } - - File solrHomeDir = new File(FileUtils.getTempDirectory().getPath() + "/solr_home"); - String solrXml = "solr.xml"; - FileUtils.copyFile(getFile(solrXml), new File(solrHomeDir.getAbsolutePath() + "/" + solrXml)); - File solrLogDir = new File(solrHomeDir.getPath() + "/solr_logs"); - - createTempDir(solrHomeDir); - createTempDir(solrLogDir); - - System.setProperty("host", "localhost"); - System.setProperty("jetty.port", String.valueOf(port)); - System.setProperty("solr.log.dir", solrLogDir.getAbsolutePath()); - - standaloneSolr = new JettySolrRunner(solrHomeDir.getAbsolutePath(), "/solr", port); - Thread bg = new Thread() { - public void run() { - try { - standaloneSolr.start(); - } catch (Exception e) { - if (e instanceof RuntimeException) { - throw (RuntimeException)e; - } else { - throw new RuntimeException(e); - } - } - } - }; - bg.start(); - } - - public void stopStandaloneSolr() { - if (standaloneSolr != null) { - try { - standaloneSolr.stop(); - } catch (Exception e) { - log.error("Failed to stop standalone solr."); - } - } - } - - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); - } - - private void createTempDir(File file) { - if (!file.exists() && !file.isDirectory()) { - file.mkdirs(); - } - } -} diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java deleted file mode 100644 index 493542f..0000000 --- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.solr; - -import com.google.common.collect.Lists; -import org.testng.annotations.Test; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -/** - * SolrSinkConfig test - */ -public class SolrSinkConfigTest { - - @Test - public final void loadFromYamlFileTest() throws IOException { - File yamlFile = getFile("sinkConfig.yaml"); - String path = yamlFile.getAbsolutePath(); - SolrSinkConfig config = SolrSinkConfig.load(path); - assertNotNull(config); - assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl()); - assertEquals("SolrCloud", config.getSolrMode()); - assertEquals("techproducts", config.getSolrCollection()); - assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs()); - assertEquals("fakeuser", config.getUsername()); - assertEquals("fake@123", config.getPassword()); - } - - @Test - public final void loadFromMapTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); - map.put("solrMode", "SolrCloud"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - assertNotNull(config); - assertEquals("localhost:2181,localhost:2182/chroot", config.getSolrUrl()); - assertEquals("SolrCloud", config.getSolrMode()); - assertEquals("techproducts", config.getSolrCollection()); - assertEquals(Integer.parseInt("100"), config.getSolrCommitWithinMs()); - assertEquals("fakeuser", config.getUsername()); - assertEquals("fake@123", config.getPassword()); - } - - @Test - public final void validValidateTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); - map.put("solrMode", "SolrCloud"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "solrUrl property not set.") - public final void missingValidValidateSolrModeTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrMode", "SolrCloud"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "solrCommitWithinMs must be a positive integer.") - public final void invalidBatchTimeMsTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); - map.put("solrMode", "SolrCloud"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "-100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "No enum constant org.apache.pulsar.io.solr.SolrAbstractSink.SolrMode.NOTSUPPORT") - public final void invalidClientModeTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); - map.put("solrMode", "NotSupport"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - config.validate(); - - SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase()); - } - - @Test - public final void validZkChrootTest() throws IOException { - Map<String, Object> map = new HashMap<>(); - map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); - map.put("solrMode", "SolrCloud"); - map.put("solrCollection", "techproducts"); - map.put("solrCommitWithinMs", "100"); - map.put("username", "fakeuser"); - map.put("password", "fake@123"); - - SolrSinkConfig config = SolrSinkConfig.load(map); - config.validate(); - - String url = config.getSolrUrl(); - int chrootIndex = url.indexOf("/"); - Optional<String> chroot = Optional.empty(); - if (chrootIndex > 0) { - chroot = Optional.of(url.substring(chrootIndex)); - } - String zkUrls = chrootIndex > 0 ? url.substring(0, chrootIndex) : url; - List<String> zkHosts = Arrays.asList(zkUrls.split(",")); - - List<String> expectedZkHosts = Lists.newArrayList(); - expectedZkHosts.add("localhost:2181"); - expectedZkHosts.add("localhost:2182"); - - assertEquals("/chroot", chroot.get()); - assertEquals(expectedZkHosts, zkHosts); - } - - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); - } -} diff --git a/pulsar-io/solr/src/test/resources/sinkConfig.yaml b/pulsar-io/solr/src/test/resources/sinkConfig.yaml deleted file mode 100644 index d96b353..0000000 --- a/pulsar-io/solr/src/test/resources/sinkConfig.yaml +++ /dev/null @@ -1,27 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -{ -"solrUrl": "localhost:2181,localhost:2182/chroot", -"solrMode": "SolrCloud", -"solrCollection": "techproducts", -"solrCommitWithinMs": "100", -"username": "fakeuser", -"password": "fake@123" -} diff --git a/pulsar-io/solr/src/test/resources/solr.xml b/pulsar-io/solr/src/test/resources/solr.xml deleted file mode 100644 index 495a71f..0000000 --- a/pulsar-io/solr/src/test/resources/solr.xml +++ /dev/null @@ -1,38 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<solr> - <solrcloud> - <str name="host">${host:}</str> - <int name="hostPort">${jetty.port:8983}</int> - <str name="hostContext">${hostContext:solr}</str> - <bool name="genericCoreNodeNames">${genericCoreNodeNames:true}</bool> - <int name="zkClientTimeout">${zkClientTimeout:30000}</int> - <int name="distribUpdateSoTimeout">${distribUpdateSoTimeout:600000}</int> - <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int> - <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str> - <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str> - </solrcloud> - <shardHandlerFactory name="shardHandlerFactory" - class="HttpShardHandlerFactory"> - <int name="socketTimeout">${socketTimeout:600000}</int> - <int name="connTimeout">${connTimeout:60000}</int> - </shardHandlerFactory> -</solr> \ No newline at end of file