Repository: beam Updated Branches: refs/heads/master 2fa4fdecd -> f5714f220
[BEAM-2657] Create Solr IO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d00ff9e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d00ff9e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d00ff9e2 Branch: refs/heads/master Commit: d00ff9e215092af2666459b742f62c6b0bb4bff9 Parents: 2fa4fde Author: Cao Manh Dat <da...@apache.org> Authored: Sat Jul 22 14:38:07 2017 +0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Aug 8 11:54:09 2017 -0700 ---------------------------------------------------------------------- pom.xml | 8 +- .../sdk/io/common/IOTestPipelineOptions.java | 6 + sdks/java/io/pom.xml | 1 + sdks/java/io/solr/pom.xml | 147 ++++ .../beam/sdk/io/solr/AuthorizedSolrClient.java | 91 +++ .../beam/sdk/io/solr/JavaBinCodecCoder.java | 98 +++ .../org/apache/beam/sdk/io/solr/SolrIO.java | 717 +++++++++++++++++++ .../apache/beam/sdk/io/solr/package-info.java | 20 + .../beam/sdk/io/solr/JavaBinCodecCoderTest.java | 81 +++ .../org/apache/beam/sdk/io/solr/SolrIOTest.java | 269 +++++++ .../beam/sdk/io/solr/SolrIOTestUtils.java | 132 ++++ .../resources/cloud-minimal/conf/schema.xml | 29 + .../resources/cloud-minimal/conf/solrconfig.xml | 48 ++ sdks/java/javadoc/pom.xml | 5 + 14 files changed, 1651 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 80ab6e2..1bdaa97 100644 --- a/pom.xml +++ b/pom.xml @@ -524,7 +524,13 @@ <version>${project.version}</version> </dependency> - <dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-solr</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java index 25ab929..256c94d 100644 --- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java +++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java @@ -71,6 +71,12 @@ public interface IOTestPipelineOptions extends TestPipelineOptions { Integer getElasticsearchHttpPort(); void setElasticsearchHttpPort(Integer value); + /* Solr */ + @Description("Address of Zookeeper server for Solr") + @Default.String("zookeeper-server") + String getSolrZookeeperServer(); + void setSolrZookeeperServer(String value); + /* Cassandra */ @Description("Host for Cassandra server (host name/ip address)") @Default.String("cassandra-host") http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 4e02aa8..c291e5d 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -56,6 +56,7 @@ <module>kinesis</module> <module>mongodb</module> <module>mqtt</module> + <module>solr</module> <module>xml</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/pom.xml b/sdks/java/io/solr/pom.xml new file mode 100644 index 0000000..a757a57 --- /dev/null +++ b/sdks/java/io/solr/pom.xml @@ -0,0 +1,147 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>beam-sdks-java-io-parent</artifactId> + <groupId>org.apache.beam</groupId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>beam-sdks-java-io-solr</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Solr</name> + <description>IO to read and write from/to Solr.</description> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + <version>5.5.4</version> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + + <!-- compile dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.4.1</version> + <scope>provided</scope> + </dependency> + + <!-- test --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-common</artifactId> + <scope>test</scope> + <classifier>tests</classifier> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-test-framework</artifactId> + <version>5.5.4</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + <version>5.5.4</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.carrotsearch.randomizedtesting</groupId> + <artifactId>randomizedtesting-runner</artifactId> + <version>2.3.2</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java new file mode 100644 index 0000000..44d7b88 --- /dev/null +++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/AuthorizedSolrClient.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.beam.sdk.io.solr.SolrIO.ConnectionConfiguration; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrResponse; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.response.CoreAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.params.SolrParams; + +/** + * Client for interact with Solr. + * @param <ClientT> type of SolrClient + */ +class AuthorizedSolrClient<ClientT extends SolrClient> implements Closeable { + private final ClientT solrClient; + private final String username; + private final String password; + + AuthorizedSolrClient(ClientT solrClient, ConnectionConfiguration configuration) { + checkArgument( + solrClient != null, + "solrClient can not be null"); + checkArgument( + configuration != null, + "configuration can not be null"); + this.solrClient = solrClient; + this.username = configuration.getUsername(); + this.password = configuration.getPassword(); + } + + QueryResponse query(String collection, SolrParams solrParams) + throws IOException, SolrServerException { + QueryRequest query = new QueryRequest(solrParams); + return process(collection, query); + } + + <ResponseT extends SolrResponse> ResponseT process(String collection, + SolrRequest<ResponseT> request) throws IOException, SolrServerException { + request.setBasicAuthCredentials(username, password); + return request.process(solrClient, collection); + } + + CoreAdminResponse process(CoreAdminRequest request) + throws IOException, SolrServerException { + return process(null, request); + } + + SolrResponse process(CollectionAdminRequest request) + throws IOException, SolrServerException { + return process(null, request); + } + + static ClusterState getClusterState( + AuthorizedSolrClient<CloudSolrClient> authorizedSolrClient) { + authorizedSolrClient.solrClient.connect(); + return authorizedSolrClient.solrClient.getZkStateReader().getClusterState(); + } + + @Override public void close() throws IOException { + solrClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java new file mode 100644 index 0000000..aef3c4b --- /dev/null +++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoder.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviderRegistrar; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.commons.compress.utils.BoundedInputStream; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.JavaBinCodec; + +/** A {@link Coder} that encodes using {@link JavaBinCodec}. */ +class JavaBinCodecCoder<T> extends AtomicCoder<T> { + private final Class<T> clazz; + + private JavaBinCodecCoder(Class<T> clazz) { + this.clazz = clazz; + } + + public static <T> JavaBinCodecCoder<T> of(Class<T> clazz) { + return new JavaBinCodecCoder<>(clazz); + } + + @Override + public void encode(T value, OutputStream outStream) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null SolrDocument"); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JavaBinCodec codec = new JavaBinCodec(); + codec.marshal(value, baos); + + byte[] bytes = baos.toByteArray(); + VarInt.encode(bytes.length, outStream); + outStream.write(bytes); + } + + @Override + public T decode(InputStream inStream) throws IOException { + DataInputStream in = new DataInputStream(inStream); + + int len = VarInt.decodeInt(in); + if (len < 0) { + throw new CoderException("Invalid encoded SolrDocument length: " + len); + } + + JavaBinCodec codec = new JavaBinCodec(); + return (T) codec.unmarshal(new BoundedInputStream(in, len)); + } + + @Override + public TypeDescriptor<T> getEncodedTypeDescriptor() { + return TypeDescriptor.of(clazz); + } + + @AutoService(CoderProviderRegistrar.class) + public static class Provider implements CoderProviderRegistrar { + @Override + public List<CoderProvider> getCoderProviders() { + return Arrays.asList( + CoderProviders.forCoder( + TypeDescriptor.of(SolrDocument.class), JavaBinCodecCoder.of(SolrDocument.class)), + CoderProviders.forCoder( + TypeDescriptor.of(SolrInputDocument.class), + JavaBinCodecCoder.of(SolrInputDocument.class))); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java new file mode 100644 index 0000000..c137eea --- /dev/null +++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java @@ -0,0 +1,717 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.io.IOException; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CoreAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.CoreAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.schema.SchemaResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CoreAdminParams; +import org.apache.solr.common.params.CursorMarkParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; + +/** + * Transforms for reading and writing data from/to Solr. + * + * <h3>Reading from Solr</h3> + * + * <p>{@link SolrIO#read SolrIO.read()} returns a bounded {@link PCollection + * PCollection<SolrDocument>} representing Solr documents. + * + * <p>To configure the {@link SolrIO#read}, you have to provide a connection configuration + * containing the Zookeeper address of the Solr cluster, and the collection name. The following + * example illustrates options for configuring the source: + * + * <pre>{@code + * SolrIO.ConnectionConfiguration conn = SolrIO.ConnectionConfiguration.create("127.0.0.1:9983"); + * // Optionally: .withBasicCredentials(username, password) + * + * PCollection<SolrDocument> docs = p.apply( + * SolrIO.read().from("my-collection").withConnectionConfiguration(conn)); + * + * }</pre> + * + * <p>You can specify a query on the {@code read()} using {@code withQuery()}. + * + * <h3>Writing to Solr</h3> + * + * <p>To write documents to Solr, use {@link SolrIO#write SolrIO.write()}, which writes Solr + * documents from a {@link PCollection PCollection<SolrInputDocument>} (which can be bounded + * or unbounded). + * + * <p>To configure {@link SolrIO#write SolrIO.write()}, similar to the read, you have to provide a + * connection configuration, and a collection name. For instance: + * + * <pre>{@code + * PCollection<SolrInputDocument> inputDocs = ...; + * inputDocs.apply(SolrIO.write().to("my-collection").withConnectionConfiguration(conn)); + * + * }</pre> + */ +@Experimental(Experimental.Kind.SOURCE_SINK) +public class SolrIO { + + public static Read read() { + // 1000 for batch size is good enough in many cases, + // ex: if document size is large, around 10KB, the response's size will be around 10MB + // if document seize is small, around 1KB, the response's size will be around 1MB + return new AutoValue_SolrIO_Read.Builder().setBatchSize(1000).setQuery("*:*").build(); + } + + public static Write write() { + // 1000 for batch size is good enough in many cases, + // ex: if document size is large, around 10KB, the request's size will be around 10MB + // if document seize is small, around 1KB, the request's size will be around 1MB + return new AutoValue_SolrIO_Write.Builder().setMaxBatchSize(1000).build(); + } + + private SolrIO() {} + + /** A POJO describing a connection configuration to Solr. */ + @AutoValue + public abstract static class ConnectionConfiguration implements Serializable { + + abstract String getZkHost(); + + @Nullable + abstract String getUsername(); + + @Nullable + abstract String getPassword(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setZkHost(String zkHost); + + abstract Builder setUsername(String username); + + abstract Builder setPassword(String password); + + abstract ConnectionConfiguration build(); + } + + /** + * Creates a new Solr connection configuration. + * + * @param zkHost host of zookeeper + * @return the connection configuration object + */ + public static ConnectionConfiguration create(String zkHost) { + checkArgument(zkHost != null, "zkHost can not be null"); + return new AutoValue_SolrIO_ConnectionConfiguration.Builder().setZkHost(zkHost).build(); + } + + /** If Solr basic authentication is enabled, provide the username and password. */ + public ConnectionConfiguration withBasicCredentials(String username, String password) { + checkArgument(username != null, "username can not be null"); + checkArgument(!username.isEmpty(), "username can not be empty"); + checkArgument(password != null, "password can not be null"); + checkArgument(!password.isEmpty(), "password can not be empty"); + return builder().setUsername(username).setPassword(password).build(); + } + + private void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("zkHost", getZkHost())); + builder.addIfNotNull(DisplayData.item("username", getUsername())); + } + + private HttpClient createHttpClient() { + // This is bug in Solr, if we don't create a customize HttpClient, + // UpdateRequest with commit flag will throw an authentication error. + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(HttpClientUtil.PROP_BASIC_AUTH_USER, getUsername()); + params.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, getPassword()); + return HttpClientUtil.createClient(params); + } + + AuthorizedSolrClient<CloudSolrClient> createClient() throws MalformedURLException { + CloudSolrClient solrClient = new CloudSolrClient(getZkHost(), createHttpClient()); + return new AuthorizedSolrClient<>(solrClient, this); + } + + AuthorizedSolrClient<HttpSolrClient> createClient(String shardUrl) { + HttpSolrClient solrClient = new HttpSolrClient(shardUrl, createHttpClient()); + return new AuthorizedSolrClient<>(solrClient, this); + } + } + + /** A {@link PTransform} reading data from Solr. */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<SolrDocument>> { + private static final long MAX_BATCH_SIZE = 10000L; + + @Nullable + abstract ConnectionConfiguration getConnectionConfiguration(); + + @Nullable + abstract String getCollection(); + + abstract String getQuery(); + + abstract int getBatchSize(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); + + abstract Builder setQuery(String query); + + abstract Builder setBatchSize(int batchSize); + + abstract Builder setCollection(String collection); + + abstract Read build(); + } + + /** Provide the Solr connection configuration object. */ + public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null"); + return builder().setConnectionConfiguration(connectionConfiguration).build(); + } + + /** + * Provide name of collection while reading from Solr. + * + * @param collection the collection toward which the requests will be issued + */ + public Read from(String collection) { + checkArgument(collection != null, "collection can not be null"); + return builder().setCollection(collection).build(); + } + + /** + * Provide a query used while reading from Solr. + * + * @param query the query. See <a + * href="https://cwiki.apache.org/confluence/display/solr/The+Standard+Query+Parser">Solr + * Query </a> + */ + public Read withQuery(String query) { + checkArgument(query != null, "query can not be null"); + checkArgument(!query.isEmpty(), "query can not be empty"); + return builder().setQuery(query).build(); + } + + /** + * Provide a size for the cursor read. See <a + * href="https://cwiki.apache.org/confluence/display/solr/Pagination+of+Results">cursor API</a> + * Default is 100. Maximum is 10 000. If documents are small, increasing batch size might + * improve read performance. If documents are big, you might need to decrease batchSize + * + * @param batchSize number of documents read in each scroll read + */ + @VisibleForTesting + Read withBatchSize(int batchSize) { + // TODO remove this configuration, we can figure out the best number + // by tuning batchSize when pipelines run. + checkArgument( + batchSize > 0 && batchSize < MAX_BATCH_SIZE, + "Valid values for batchSize are 1 (inclusize) to %s (exclusive), but was: %s ", + MAX_BATCH_SIZE, + batchSize); + return builder().setBatchSize(batchSize).build(); + } + + @Override + public PCollection<SolrDocument> expand(PBegin input) { + return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null))); + } + + @Override + public void validate(PipelineOptions options) { + checkState( + getConnectionConfiguration() != null, + "Need to set connection configuration using withConnectionConfiguration()"); + checkState(getCollection() != null, "Need to set collection name using to()"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotNull(DisplayData.item("query", getQuery())); + getConnectionConfiguration().populateDisplayData(builder); + } + } + + /** A POJO describing a replica of Solr. */ + @AutoValue + abstract static class ReplicaInfo implements Serializable { + public abstract String coreName(); + + public abstract String coreUrl(); + + public abstract String baseUrl(); + + static ReplicaInfo create(Replica replica) { + return new AutoValue_SolrIO_ReplicaInfo( + replica.getStr(ZkStateReader.CORE_NAME_PROP), + replica.getCoreUrl(), + replica.getStr(ZkStateReader.BASE_URL_PROP)); + } + } + + /** A {@link BoundedSource} reading from Solr. */ + @VisibleForTesting + static class BoundedSolrSource extends BoundedSource<SolrDocument> { + + private final SolrIO.Read spec; + // replica is the info of the shard where the source will read the documents + @Nullable private final ReplicaInfo replica; + + BoundedSolrSource(Read spec, @Nullable Replica replica) { + this.spec = spec; + this.replica = replica == null ? null : ReplicaInfo.create(replica); + } + + @Override + public List<? extends BoundedSource<SolrDocument>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + ConnectionConfiguration connectionConfig = spec.getConnectionConfiguration(); + List<BoundedSolrSource> sources = new ArrayList<>(); + try (AuthorizedSolrClient<CloudSolrClient> client = connectionConfig.createClient()) { + String collection = spec.getCollection(); + final ClusterState clusterState = AuthorizedSolrClient.getClusterState(client); + DocCollection docCollection = clusterState.getCollection(collection); + for (Slice slice : docCollection.getSlices()) { + ArrayList<Replica> replicas = new ArrayList<>(slice.getReplicas()); + Collections.shuffle(replicas); + // Load balancing by randomly picking an active replica + Replica randomActiveReplica = null; + for (Replica replica : replicas) { + // We need to check both state of the replica and live nodes + // to make sure that the replica is alive + if (replica.getState() == Replica.State.ACTIVE + && clusterState.getLiveNodes().contains(replica.getNodeName())) { + randomActiveReplica = replica; + break; + } + } + // TODO in case of this replica goes inactive while the pipeline runs. + // We should pick another active replica of this shard. + checkState( + randomActiveReplica != null, + "Can not found an active replica for slice %s", + slice.getName()); + sources.add(new BoundedSolrSource(spec, randomActiveReplica)); + } + } + return sources; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws IOException { + if (replica != null) { + return getEstimatedSizeOfShard(replica); + } else { + return getEstimatedSizeOfCollection(); + } + } + + private long getEstimatedSizeOfShard(ReplicaInfo replica) throws IOException { + try (AuthorizedSolrClient solrClient = + spec.getConnectionConfiguration().createClient(replica.baseUrl())) { + CoreAdminRequest req = new CoreAdminRequest(); + req.setAction(CoreAdminParams.CoreAdminAction.STATUS); + req.setIndexInfoNeeded(true); + CoreAdminResponse response; + try { + response = solrClient.process(req); + } catch (SolrServerException e) { + throw new IOException("Can not get core status from " + replica, e); + } + NamedList<Object> coreStatus = response.getCoreStatus(replica.coreName()); + NamedList<Object> indexStats = (NamedList<Object>) coreStatus.get("index"); + return (long) indexStats.get("sizeInBytes"); + } + } + + private long getEstimatedSizeOfCollection() throws IOException { + long sizeInBytes = 0; + ConnectionConfiguration config = spec.getConnectionConfiguration(); + try (AuthorizedSolrClient<CloudSolrClient> solrClient = config.createClient()) { + DocCollection docCollection = + AuthorizedSolrClient.getClusterState(solrClient).getCollection(spec.getCollection()); + if (docCollection.getSlices().isEmpty()) { + return 0; + } + + ArrayList<Slice> slices = new ArrayList<>(docCollection.getSlices()); + Collections.shuffle(slices); + ExecutorService executor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setDaemon(true) + .setNameFormat("solrio-size-of-collection-estimation") + .build()); + try { + ArrayList<Future<Long>> futures = new ArrayList<>(); + for (int i = 0; i < 100 && i < slices.size(); i++) { + Slice slice = slices.get(i); + final Replica replica = slice.getLeader(); + Future<Long> future = + executor.submit( + new Callable<Long>() { + @Override + public Long call() throws Exception { + return getEstimatedSizeOfShard(ReplicaInfo.create(replica)); + } + }); + futures.add(future); + } + for (Future<Long> future : futures) { + try { + sizeInBytes += future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException("Can not estimate size of shard", e.getCause()); + } + } + } finally { + executor.shutdownNow(); + } + + if (slices.size() <= 100) { + return sizeInBytes; + } + return (sizeInBytes / 100) * slices.size(); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + spec.populateDisplayData(builder); + if (replica != null) { + builder.addIfNotNull(DisplayData.item("shardUrl", replica.coreUrl())); + } + } + + @Override + public BoundedReader<SolrDocument> createReader(PipelineOptions options) throws IOException { + return new BoundedSolrReader(this); + } + + @Override + public void validate() { + spec.validate(null); + } + + @Override + public Coder<SolrDocument> getOutputCoder() { + return JavaBinCodecCoder.of(SolrDocument.class); + } + } + + private static class BoundedSolrReader extends BoundedSource.BoundedReader<SolrDocument> { + + private final BoundedSolrSource source; + + private AuthorizedSolrClient solrClient; + private SolrDocument current; + private String cursorMark; + private Iterator<SolrDocument> batchIterator; + private boolean done; + private String uniqueKey; + + private BoundedSolrReader(BoundedSolrSource source) { + this.source = source; + this.cursorMark = CursorMarkParams.CURSOR_MARK_START; + } + + @Override + public boolean start() throws IOException { + if (source.replica != null) { + solrClient = + source.spec.getConnectionConfiguration().createClient(source.replica.baseUrl()); + } else { + solrClient = source.spec.getConnectionConfiguration().createClient(); + } + SchemaRequest.UniqueKey uniqueKeyRequest = new SchemaRequest.UniqueKey(); + try { + String collection = source.spec.getCollection(); + SchemaResponse.UniqueKeyResponse uniqueKeyResponse = + (SchemaResponse.UniqueKeyResponse) solrClient.process(collection, uniqueKeyRequest); + uniqueKey = uniqueKeyResponse.getUniqueKey(); + } catch (SolrServerException e) { + throw new IOException("Can not get unique key from solr", e); + } + return advance(); + } + + private SolrQuery getQueryParams(BoundedSolrSource source) { + String query = source.spec.getQuery(); + if (query == null) { + query = "*:*"; + } + SolrQuery solrQuery = new SolrQuery(query); + solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); + solrQuery.setRows(source.spec.getBatchSize()); + solrQuery.addSort(uniqueKey, SolrQuery.ORDER.asc); + if (source.replica != null) { + solrQuery.setDistrib(false); + } + return solrQuery; + } + + private void updateCursorMark(QueryResponse response) { + if (cursorMark.equals(response.getNextCursorMark())) { + done = true; + } + cursorMark = response.getNextCursorMark(); + } + + @Override + public boolean advance() throws IOException { + if (batchIterator != null && batchIterator.hasNext()) { + current = batchIterator.next(); + return true; + } else { + SolrQuery solrQuery = getQueryParams(source); + try { + QueryResponse response; + if (source.replica != null) { + response = solrClient.query(source.replica.coreName(), solrQuery); + } else { + response = solrClient.query(source.spec.getCollection(), solrQuery); + } + updateCursorMark(response); + return readNextBatchAndReturnFirstDocument(response); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + } + + private boolean readNextBatchAndReturnFirstDocument(QueryResponse response) { + if (done) { + current = null; + batchIterator = null; + return false; + } + + batchIterator = response.getResults().iterator(); + current = batchIterator.next(); + return true; + } + + @Override + public SolrDocument getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public void close() throws IOException { + solrClient.close(); + } + + @Override + public BoundedSource<SolrDocument> getCurrentSource() { + return source; + } + } + + /** A {@link PTransform} writing data to Solr. */ + @AutoValue + public abstract static class Write extends PTransform<PCollection<SolrInputDocument>, PDone> { + + @Nullable + abstract ConnectionConfiguration getConnectionConfiguration(); + + @Nullable + abstract String getCollection(); + + abstract int getMaxBatchSize(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); + + abstract Builder setCollection(String collection); + + abstract Builder setMaxBatchSize(int maxBatchSize); + + abstract Write build(); + } + + /** Provide the Solr connection configuration object. */ + public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument(connectionConfiguration != null, "connectionConfiguration can not be null"); + return builder().setConnectionConfiguration(connectionConfiguration).build(); + } + + /** + * Provide name of collection while reading from Solr. + * + * @param collection the collection toward which the requests will be issued + */ + public Write to(String collection) { + checkArgument(collection != null, "collection can not be null"); + return builder().setCollection(collection).build(); + } + + /** + * Provide a maximum size in number of documents for the batch. Depending on the execution + * engine, size of bundles may vary, this sets the maximum size. Change this if you need to have + * smaller batch. + * + * @param batchSize maximum batch size in number of documents + */ + @VisibleForTesting + Write withMaxBatchSize(int batchSize) { + // TODO remove this configuration, we can figure out the best number + // by tuning batchSize when pipelines run. + checkArgument(batchSize > 0, "batchSize must be larger than 0, but was: %s", batchSize); + return builder().setMaxBatchSize(batchSize).build(); + } + + @Override + public void validate(PipelineOptions options) { + checkState( + getConnectionConfiguration() != null, + "Need to set connection configuration via withConnectionConfiguration()"); + checkState(getCollection() != null, "Need to set collection name via to()"); + } + + @Override + public PDone expand(PCollection<SolrInputDocument> input) { + input.apply(ParDo.of(new WriteFn(this))); + return PDone.in(input.getPipeline()); + } + + @VisibleForTesting + static class WriteFn extends DoFn<SolrInputDocument, Void> { + + private final Write spec; + + private transient AuthorizedSolrClient solrClient; + private Collection<SolrInputDocument> batch; + + WriteFn(Write spec) { + this.spec = spec; + } + + @Setup + public void createClient() throws Exception { + solrClient = spec.getConnectionConfiguration().createClient(); + } + + @StartBundle + public void startBundle(StartBundleContext context) throws Exception { + batch = new ArrayList<>(); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + SolrInputDocument document = context.element(); + batch.add(document); + if (batch.size() >= spec.getMaxBatchSize()) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + flushBatch(); + } + + private void flushBatch() throws IOException { + if (batch.isEmpty()) { + return; + } + try { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.add(batch); + solrClient.process(spec.getCollection(), updateRequest); + } catch (SolrServerException e) { + throw new IOException("Error writing to Solr", e); + } finally { + batch.clear(); + } + } + + @Teardown + public void closeClient() throws Exception { + if (solrClient != null) { + solrClient.close(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java new file mode 100644 index 0000000..83867ed --- /dev/null +++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from/to Solr. */ +package org.apache.beam.sdk.io.solr; http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java new file mode 100644 index 0000000..1fb435d --- /dev/null +++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/JavaBinCodecCoderTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.solr.common.SolrDocument; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test case for {@link JavaBinCodecCoder}. */ +@RunWith(JUnit4.class) +public class JavaBinCodecCoderTest { + private static final Coder<SolrDocument> TEST_CODER = JavaBinCodecCoder.of(SolrDocument.class); + private static final List<SolrDocument> TEST_VALUES = new ArrayList<>(); + + static { + SolrDocument doc = new SolrDocument(); + doc.put("id", "1"); + doc.put("content", "wheel on the bus"); + doc.put("_version_", 1573597324260671488L); + TEST_VALUES.add(doc); + + doc = new SolrDocument(); + doc.put("id", "2"); + doc.put("content", "goes round and round"); + doc.put("_version_", 1573597324260671489L); + TEST_VALUES.add(doc); + } + + @Test + public void testDecodeEncodeEqual() throws Exception { + for (SolrDocument value : TEST_VALUES) { + CoderProperties.coderDecodeEncodeContentsInSameOrder(TEST_CODER, value); + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value); + } + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void encodeNullThrowsCoderException() throws Exception { + thrown.expect(CoderException.class); + thrown.expectMessage("cannot encode a null SolrDocument"); + + CoderUtils.encodeToBase64(TEST_CODER, null); + } + + @Test + public void testEncodedTypeDescriptor() throws Exception { + assertThat( + TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(SolrDocument.class))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java new file mode 100644 index 0000000..4358ce4 --- /dev/null +++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.google.common.io.BaseEncoding; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; + +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.PCollection; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.security.Sha256AuthenticationProvider; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A test of {@link SolrIO} on an independent Solr instance. */ +@ThreadLeakScope(value = ThreadLeakScope.Scope.NONE) +@SolrTestCaseJ4.SuppressSSL +public class SolrIOTest extends SolrCloudTestCase { + private static final Logger LOG = LoggerFactory.getLogger(SolrIOTest.class); + + private static final String SOLR_COLLECTION = "beam"; + private static final int NUM_SHARDS = 3; + private static final long NUM_DOCS = 400L; + private static final int NUM_SCIENTISTS = 10; + private static final int BATCH_SIZE = 200; + + private static AuthorizedSolrClient<CloudSolrClient> solrClient; + private static SolrIO.ConnectionConfiguration connectionConfiguration; + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void beforeClass() throws Exception { + // setup credential for solr user, + // See https://cwiki.apache.org/confluence/display/solr/Basic+Authentication+Plugin + String password = "SolrRocks"; + // salt's size can be arbitrary + byte[] salt = new byte[random().nextInt(30) + 1]; + random().nextBytes(salt); + String base64Salt = BaseEncoding.base64().encode(salt); + String sha56 = Sha256AuthenticationProvider.sha256(password, base64Salt); + String credential = sha56 + " " + base64Salt; + String securityJson = + "{" + + "'authentication':{" + + " 'blockUnknown': true," + + " 'class':'solr.BasicAuthPlugin'," + + " 'credentials':{'solr':'" + + credential + + "'}}" + + "}"; + + configureCluster(3).addConfig("conf", getFile("cloud-minimal/conf").toPath()).configure(); + ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + zkStateReader + .getZkClient() + .setData("/security.json", securityJson.getBytes(Charset.defaultCharset()), true); + String zkAddress = cluster.getZkServer().getZkAddress(); + connectionConfiguration = + SolrIO.ConnectionConfiguration.create(zkAddress).withBasicCredentials("solr", password); + solrClient = connectionConfiguration.createClient(); + SolrIOTestUtils.createCollection(SOLR_COLLECTION, NUM_SHARDS, 1, solrClient); + } + + @AfterClass + public static void afterClass() throws Exception { + solrClient.close(); + } + + @Before + public void before() throws Exception { + SolrIOTestUtils.clearCollection(SOLR_COLLECTION, solrClient); + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + + public void testBadCredentials() throws IOException { + thrown.expect(SolrException.class); + + String zkAddress = cluster.getZkServer().getZkAddress(); + SolrIO.ConnectionConfiguration connectionConfiguration = + SolrIO.ConnectionConfiguration.create(zkAddress) + .withBasicCredentials("solr", "wrongpassword"); + try (AuthorizedSolrClient solrClient = connectionConfiguration.createClient()) { + SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient); + } + } + + @Test + public void testSizes() throws Exception { + SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient); + + PipelineOptions options = PipelineOptionsFactory.create(); + SolrIO.Read read = + SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION); + SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null); + // can't use equal assert as Solr collections never have same size + // (due to internal Lucene implementation) + long estimatedSize = initialSource.getEstimatedSizeBytes(options); + LOG.info("Estimated size: {}", estimatedSize); + assertThat( + "Wrong estimated size bellow minimum", + estimatedSize, + greaterThan(SolrIOTestUtils.MIN_DOC_SIZE * NUM_DOCS)); + assertThat( + "Wrong estimated size beyond maximum", + estimatedSize, + lessThan(SolrIOTestUtils.MAX_DOC_SIZE * NUM_DOCS)); + } + + @Test + public void testRead() throws Exception { + SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient); + + PCollection<SolrDocument> output = + pipeline.apply( + SolrIO.read() + .withConnectionConfiguration(connectionConfiguration) + .from(SOLR_COLLECTION) + .withBatchSize(101)); + PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally())) + .isEqualTo(NUM_DOCS); + pipeline.run(); + } + + @Test + public void testReadWithQuery() throws Exception { + SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient); + + PCollection<SolrDocument> output = + pipeline.apply( + SolrIO.read() + .withConnectionConfiguration(connectionConfiguration) + .from(SOLR_COLLECTION) + .withQuery("scientist:Franklin")); + PAssert.thatSingleton(output.apply("Count", Count.<SolrDocument>globally())) + .isEqualTo(NUM_DOCS / NUM_SCIENTISTS); + pipeline.run(); + } + + @Test + public void testWrite() throws Exception { + List<SolrInputDocument> data = SolrIOTestUtils.createDocuments(NUM_DOCS); + SolrIO.Write write = + SolrIO.write().withConnectionConfiguration(connectionConfiguration).to(SOLR_COLLECTION); + pipeline.apply(Create.of(data)).apply(write); + pipeline.run(); + + long currentNumDocs = SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient); + assertEquals(NUM_DOCS, currentNumDocs); + + QueryResponse response = solrClient.query(SOLR_COLLECTION, new SolrQuery("scientist:Lovelace")); + assertEquals(NUM_DOCS / NUM_SCIENTISTS, response.getResults().getNumFound()); + } + + @Test + public void testWriteWithMaxBatchSize() throws Exception { + SolrIO.Write write = + SolrIO.write() + .withConnectionConfiguration(connectionConfiguration) + .to(SOLR_COLLECTION) + .withMaxBatchSize(BATCH_SIZE); + // write bundles size is the runner decision, we cannot force a bundle size, + // so we test the Writer as a DoFn outside of a runner. + try (DoFnTester<SolrInputDocument, Void> fnTester = + DoFnTester.of(new SolrIO.Write.WriteFn(write))) { + List<SolrInputDocument> input = SolrIOTestUtils.createDocuments(NUM_DOCS); + long numDocsProcessed = 0; + long numDocsInserted = 0; + for (SolrInputDocument document : input) { + fnTester.processElement(document); + numDocsProcessed++; + // test every 100 docs to avoid overloading Solr + if ((numDocsProcessed % 100) == 0) { + // force the index to upgrade after inserting for the inserted docs + // to be searchable immediately + long currentNumDocs = + SolrIOTestUtils.commitAndGetCurrentNumDocs(SOLR_COLLECTION, solrClient); + if ((numDocsProcessed % BATCH_SIZE) == 0) { + /* bundle end */ + assertEquals( + "we are at the end of a bundle, we should have inserted all processed documents", + numDocsProcessed, + currentNumDocs); + numDocsInserted = currentNumDocs; + } else { + /* not bundle end */ + assertEquals( + "we are not at the end of a bundle, we should have inserted no more documents", + numDocsInserted, + currentNumDocs); + } + } + } + } + } + + @Test + public void testSplit() throws Exception { + SolrIOTestUtils.insertTestDocuments(SOLR_COLLECTION, NUM_DOCS, solrClient); + + PipelineOptions options = PipelineOptionsFactory.create(); + SolrIO.Read read = + SolrIO.read().withConnectionConfiguration(connectionConfiguration).from(SOLR_COLLECTION); + SolrIO.BoundedSolrSource initialSource = new SolrIO.BoundedSolrSource(read, null); + //desiredBundleSize is ignored for now + int desiredBundleSizeBytes = 0; + List<? extends BoundedSource<SolrDocument>> splits = + initialSource.split(desiredBundleSizeBytes, options); + SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); + + int expectedNumSplits = NUM_SHARDS; + assertEquals(expectedNumSplits, splits.size()); + int nonEmptySplits = 0; + for (BoundedSource<SolrDocument> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + // docs are hashed by id to shards, in this test, NUM_DOCS >> NUM_SHARDS + // therefore, can not exist an empty shard. + assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java new file mode 100644 index 0000000..808cd0f --- /dev/null +++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solr; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.SolrInputDocument; + +/** Test utilities to use with {@link SolrIO}. */ +public class SolrIOTestUtils { + public static final long MIN_DOC_SIZE = 40L; + public static final long MAX_DOC_SIZE = 90L; + + static void createCollection( + String collection, int numShards, int replicationFactor, AuthorizedSolrClient client) + throws Exception { + CollectionAdminRequest.Create create = + new CollectionAdminRequest.Create() + .setCollectionName(collection) + .setNumShards(numShards) + .setReplicationFactor(replicationFactor) + .setMaxShardsPerNode(2); + client.process(create); + } + + /** Inserts the given number of test documents into Solr. */ + static void insertTestDocuments(String collection, long numDocs, AuthorizedSolrClient client) + throws IOException { + List<SolrInputDocument> data = createDocuments(numDocs); + try { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true); + updateRequest.add(data); + client.process(collection, updateRequest); + } catch (SolrServerException e) { + throw new IOException("Failed to insert test documents to collection", e); + } + } + + /** Delete given collection. */ + static void deleteCollection(String collection, AuthorizedSolrClient client) throws IOException { + try { + CollectionAdminRequest.Delete delete = + new CollectionAdminRequest.Delete().setCollectionName(collection); + client.process(delete); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + /** Clear given collection. */ + static void clearCollection(String collection, AuthorizedSolrClient client) throws IOException { + try { + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.setAction(UpdateRequest.ACTION.COMMIT, true, true); + updateRequest.deleteByQuery("*:*"); + client.process(collection, updateRequest); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + /** + * Forces a commit of the given collection to make recently inserted documents available for + * search. + * + * @return The number of docs in the index + */ + static long commitAndGetCurrentNumDocs(String collection, AuthorizedSolrClient client) + throws IOException { + SolrQuery solrQuery = new SolrQuery("*:*"); + solrQuery.setRows(0); + try { + UpdateRequest update = new UpdateRequest(); + update.setAction(UpdateRequest.ACTION.COMMIT, true, true); + client.process(collection, update); + + return client.query(collection, new SolrQuery("*:*")).getResults().getNumFound(); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + /** + * Generates a list of test documents for insertion. + * + * @return the list of json String representing the documents + */ + static List<SolrInputDocument> createDocuments(long numDocs) { + String[] scientists = { + "Lovelace", + "Franklin", + "Meitner", + "Hopper", + "Curie", + "Faraday", + "Newton", + "Bohr", + "Galilei", + "Maxwell" + }; + ArrayList<SolrInputDocument> data = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + int index = i % scientists.length; + SolrInputDocument doc = new SolrInputDocument(); + doc.setField("id", String.valueOf(i)); + doc.setField("scientist", scientists[index]); + data.add(doc); + } + return data; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml new file mode 100644 index 0000000..08a1716 --- /dev/null +++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/schema.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<schema name="minimal" version="1.1"> + <fieldType name="string" class="solr.StrField"/> + <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/> + <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/> + <dynamicField name="*" type="string" indexed="true" stored="true"/> + <!-- for versioning --> + <field name="_version_" type="long" indexed="true" stored="true"/> + <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/> + <field name="id" type="string" indexed="true" stored="true"/> + <dynamicField name="*_s" type="string" indexed="true" stored="true" /> + <uniqueKey>id</uniqueKey> +</schema> http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml new file mode 100644 index 0000000..8da7d28 --- /dev/null +++ b/sdks/java/io/solr/src/test/resources/cloud-minimal/conf/solrconfig.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" ?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- Minimal solrconfig.xml with /select, /admin and /update only --> + +<config> + + <dataDir>${solr.data.dir:}</dataDir> + + <directoryFactory name="DirectoryFactory" + class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/> + <schemaFactory class="ClassicIndexSchemaFactory"/> + + <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion> + + <updateHandler class="solr.DirectUpdateHandler2"> + <commitWithin> + <softCommit>${solr.commitwithin.softcommit:true}</softCommit> + </commitWithin> + <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog> + </updateHandler> + + <requestHandler name="/select" class="solr.SearchHandler"> + <lst name="defaults"> + <str name="echoParams">explicit</str> + <str name="indent">true</str> + <str name="df">text</str> + </lst> + + </requestHandler> +</config> + http://git-wip-us.apache.org/repos/asf/beam/blob/d00ff9e2/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index e1adb79..1fb6e41 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -172,6 +172,11 @@ <artifactId>beam-sdks-java-io-mqtt</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-solr</artifactId> + </dependency> + <!-- provided and optional dependencies.--> <dependency> <groupId>com.google.auto.service</groupId>