Author: markus Date: Wed Feb 3 13:51:10 2016 New Revision: 1728313 URL: http://svn.apache.org/viewvc?rev=1728313&view=rev Log: NUTCH-2197 Add Solr 5 cloud indexer support
Modified: nutch/trunk/CHANGES.txt nutch/trunk/src/plugin/indexer-solr/ivy.xml nutch/trunk/src/plugin/indexer-solr/plugin.xml nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java Modified: nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/CHANGES.txt (original) +++ nutch/trunk/CHANGES.txt Wed Feb 3 13:51:10 2016 @@ -1,5 +1,7 @@ Nutch Change Log +* NUTCH-2197 Add Solr 5 cloud indexer support (Jurian Broertjes via markus) + * NUTCH-2206 Provide example scoring.similarity.stopword.file (sujen) * NUTCH-2204 Remove junit lib from runtime (snagel) Modified: nutch/trunk/src/plugin/indexer-solr/ivy.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/ivy.xml?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/src/plugin/indexer-solr/ivy.xml (original) +++ nutch/trunk/src/plugin/indexer-solr/ivy.xml Wed Feb 3 13:51:10 2016 @@ -36,9 +36,9 @@ </publications> <dependencies> - <dependency org="org.apache.solr" name="solr-solrj" rev="4.10.2" conf="*->default"/> - <dependency org="org.apache.httpcomponents" name="httpclient" rev="4.3.1" conf="*->default"/> - <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.3.1" conf="*->default"/> + <dependency org="org.apache.solr" name="solr-solrj" rev="5.4.1"/> + <dependency org="org.apache.httpcomponents" name="httpcore" rev="4.4.1" conf="*->default"/> + <dependency org="org.apache.httpcomponents" name="httpmime" rev="4.4.1" conf="*->default"/> </dependencies> </ivy-module> Modified: nutch/trunk/src/plugin/indexer-solr/plugin.xml URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/plugin.xml?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/src/plugin/indexer-solr/plugin.xml (original) +++ nutch/trunk/src/plugin/indexer-solr/plugin.xml Wed Feb 3 13:51:10 2016 @@ -22,17 +22,16 @@ <library name="indexer-solr.jar"> <export name="*" /> </library> - <library name="commons-codec-1.9.jar"/> - <library name="commons-io-2.3.jar"/> - <library name="commons-logging-1.1.3.jar"/> - <library name="httpclient-4.3.1.jar"/> - <library name="httpcore-4.3.jar"/> - <library name="httpmime-4.3.1.jar"/> - <library name="noggit-0.5.jar"/> - <library name="slf4j-api-1.7.6.jar"/> - <library name="solr-solrj-4.10.2.jar"/> - <library name="wstx-asl-3.2.7.jar"/> - <library name="zookeeper-3.4.6.jar"/> + <library name="commons-io-2.4.jar"/> + <library name="httpclient-4.4.1.jar"/> + <library name="httpcore-4.4.1.jar"/> + <library name="httpmime-4.4.1.jar"/> + <library name="noggit-0.6.jar"/> + <library name="slf4j-api-1.7.7.jar"/> + <library name="solr-solrj-5.4.1.jar"/> + <library name="stax2-api-3.1.4.jar"/> + <library name="woodstox-core-asl-4.4.1.jar"/> + <library name="zookeeper-3.4.6.jar"/> </runtime> <requires> Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java (original) +++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrConstants.java Wed Feb 3 13:51:10 2016 @@ -17,7 +17,6 @@ package org.apache.nutch.indexwriter.solr; public interface SolrConstants { - public static final String SOLR_PREFIX = "solr."; public static final String SERVER_URL = SOLR_PREFIX + "server.url"; @@ -31,13 +30,23 @@ public interface SolrConstants { public static final String USERNAME = SOLR_PREFIX + "auth.username"; public static final String PASSWORD = SOLR_PREFIX + "auth.password"; - - public static final String SERVER_TYPE = SOLR_PREFIX + "server.type"; - - public static final String ZOOKEEPER_URL = SOLR_PREFIX + "zookeeper.url"; - - public static final String LOADBALANCE_URLS = SOLR_PREFIX + "loadbalance.urls"; - + + public static final String COLLECTION = SOLR_PREFIX + "collection"; + + public static final String ZOOKEEPER_HOSTS = SOLR_PREFIX + "zookeeper.hosts"; + + public static final String ID_FIELD = "id"; + + public static final String URL_FIELD = "url"; + + public static final String BOOST_FIELD = "boost"; + + public static final String TIMESTAMP_FIELD = "tstamp"; + + public static final String DIGEST_FIELD = "digest"; + + + @Deprecated public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index"; Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java (original) +++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrIndexWriter.java Wed Feb 3 13:51:10 2016 @@ -17,6 +17,7 @@ package org.apache.nutch.indexwriter.solr; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -28,21 +29,33 @@ import org.apache.nutch.indexer.IndexWri import org.apache.nutch.indexer.IndexerMapReduce; import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.indexer.NutchField; -import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.DateUtil; +import org.apache.solr.common.util.NamedList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.util.NutchConfiguration; + +// WORK AROUND FOR NOT REMOVING URL ENCODED URLS!!! +import java.net.URLDecoder; + public class SolrIndexWriter implements IndexWriter { public static final Logger LOG = LoggerFactory .getLogger(SolrIndexWriter.class); - private SolrServer solr; + private List<SolrClient> solrClients; private SolrMappingReader solrMapping; private ModifiableSolrParams params; @@ -50,18 +63,24 @@ public class SolrIndexWriter implements private final List<SolrInputDocument> inputDocs = new ArrayList<SolrInputDocument>(); + private final List<SolrInputDocument> updateDocs = new ArrayList<SolrInputDocument>(); + + private final List<String> deleteIds = new ArrayList<String>(); + private int batchSize; private int numDeletes = 0; + private int totalAdds = 0; + private int totalDeletes = 0; + private int totalUpdates = 0; private boolean delete = false; public void open(JobConf job, String name) throws IOException { - SolrServer server = SolrUtils.getSolrServer(job); - init(server, job); + solrClients = SolrUtils.getSolrClients(job); + init(solrClients, job); } // package protected for tests - void init(SolrServer server, JobConf job) throws IOException { - solr = server; + void init(List<SolrClient> solrClients, JobConf job) throws IOException { batchSize = job.getInt(SolrConstants.COMMIT_SIZE, 1000); solrMapping = SolrMappingReader.getInstance(job); delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false); @@ -81,13 +100,38 @@ public class SolrIndexWriter implements } public void delete(String key) throws IOException { + try { + key = URLDecoder.decode(key, "UTF8"); + } catch (UnsupportedEncodingException e) { + LOG.error("Error decoding: " + key); + throw new IOException("UnsupportedEncodingException for " + key); + } catch (IllegalArgumentException e) { + LOG.warn("Could not decode: " + key + ", it probably wasn't encoded in the first place.."); + } + + // escape solr hash separator + key = key.replaceAll("!", "\\!"); + if (delete) { - try { - solr.deleteById(key); - numDeletes++; - } catch (final SolrServerException e) { - throw makeIOException(e); + deleteIds.add(key); + totalDeletes++; + } + + if (deleteIds.size() >= batchSize) { + push(); + } + + } + + public void deleteByQuery(String query) throws IOException { + try { + LOG.info("SolrWriter: deleting " + query); + for (SolrClient solrClient : solrClients) { + solrClient.deleteByQuery(query); } + } catch (final SolrServerException e) { + LOG.error("Error deleting: " + deleteIds); + throw makeIOException(e); } } @@ -98,6 +142,7 @@ public class SolrIndexWriter implements public void write(NutchDocument doc) throws IOException { final SolrInputDocument inputDoc = new SolrInputDocument(); + for (final Entry<String, NutchField> e : doc) { for (final Object val : e.getValue().getValues()) { // normalise the string representation for a Date @@ -122,48 +167,65 @@ public class SolrIndexWriter implements inputDoc.setDocumentBoost(doc.getWeight()); inputDocs.add(inputDoc); + totalAdds++; + if (inputDocs.size() + numDeletes >= batchSize) { + push(); + } + } + + public void close() throws IOException { + commit(); + + for (SolrClient solrClient : solrClients) { + solrClient.close(); + } + } + + @Override + public void commit() throws IOException { + push(); + try { + for (SolrClient solrClient : solrClients) { + solrClient.commit(); + } + } catch (final SolrServerException e) { + LOG.error("Failed to commit solr connection: " + e.getMessage()); // FIXME + } + } + + public void push() throws IOException { + if (inputDocs.size() > 0) { try { LOG.info("Indexing " + Integer.toString(inputDocs.size()) - + " documents"); + + "/" + Integer.toString(totalAdds) + " documents"); LOG.info("Deleting " + Integer.toString(numDeletes) + " documents"); numDeletes = 0; UpdateRequest req = new UpdateRequest(); req.add(inputDocs); + req.setAction(AbstractUpdateRequest.ACTION.OPTIMIZE, false, false); req.setParams(params); - req.process(solr); + for (SolrClient solrClient : solrClients) { + NamedList res = solrClient.request(req); + } } catch (final SolrServerException e) { throw makeIOException(e); } inputDocs.clear(); } - } - public void close() throws IOException { - try { - if (!inputDocs.isEmpty()) { - LOG.info("Indexing " + Integer.toString(inputDocs.size()) - + " documents"); - if (numDeletes > 0) { - LOG.info("Deleting " + Integer.toString(numDeletes) + " documents"); + if (deleteIds.size() > 0) { + try { + LOG.info("SolrIndexer: deleting " + Integer.toString(deleteIds.size()) + + "/" + Integer.toString(totalDeletes) + " documents"); + for (SolrClient solrClient : solrClients) { + solrClient.deleteById(deleteIds); } - UpdateRequest req = new UpdateRequest(); - req.add(inputDocs); - req.setParams(params); - req.process(solr); - inputDocs.clear(); + } catch (final SolrServerException e) { + LOG.error("Error deleting: " + deleteIds); + throw makeIOException(e); } - } catch (final SolrServerException e) { - throw makeIOException(e); - } - } - - @Override - public void commit() throws IOException { - try { - solr.commit(); - } catch (SolrServerException e) { - throw makeIOException(e); + deleteIds.clear(); } } @@ -182,9 +244,10 @@ public class SolrIndexWriter implements public void setConf(Configuration conf) { config = conf; String serverURL = conf.get(SolrConstants.SERVER_URL); - if (serverURL == null) { - String message = "Missing Solr URL. Should be set via -D " - + SolrConstants.SERVER_URL; + String zkHosts = conf.get(SolrConstants.ZOOKEEPER_HOSTS); + if (serverURL == null && zkHosts == null) { + String message = "Missing SOLR URL and Zookeeper URL. Either on should be set via -D " + + SolrConstants.SERVER_URL + " or -D " + SolrConstants.ZOOKEEPER_HOSTS; message += "\n" + describe(); LOG.error(message); throw new RuntimeException(message); @@ -192,20 +255,17 @@ public class SolrIndexWriter implements } public String describe() { - StringBuffer sb = new StringBuffer("SolrIndexWriter\n"); - sb.append("\t").append(SolrConstants.SERVER_TYPE) - .append(" : Type of SolrServer to communicate with (default 'http' however options include 'cloud', 'lb' and 'concurrent')\n"); + StringBuffer sb = new StringBuffer("SOLRIndexWriter\n"); sb.append("\t").append(SolrConstants.SERVER_URL) - .append(" : URL of the Solr instance (mandatory)\n"); - sb.append("\t").append(SolrConstants.ZOOKEEPER_URL) - .append(" : URL of the Zookeeper URL (mandatory if 'cloud' value for solr.server.type)\n"); - sb.append("\t").append(SolrConstants.LOADBALANCE_URLS) - .append(" : Comma-separated string of Solr server strings to be used (madatory if 'lb' value for solr.server.type)\n"); + .append(" : URL of the SOLR instance\n"); + sb.append("\t").append(SolrConstants.ZOOKEEPER_HOSTS) + .append(" : URL of the Zookeeper quorum\n"); + sb.append("\t").append(SolrConstants.COMMIT_SIZE) + .append(" : buffer size when sending to SOLR (default 1000)\n"); sb.append("\t") .append(SolrConstants.MAPPING_FILE) - .append(" : name of the mapping file for fields (default solrindex-mapping.xml)\n"); - sb.append("\t").append(SolrConstants.COMMIT_SIZE) - .append(" : buffer size when sending to Solr (default 1000)\n"); + .append( + " : name of the mapping file for fields (default solrindex-mapping.xml)\n"); sb.append("\t").append(SolrConstants.USE_AUTH) .append(" : use authentication (default false)\n"); sb.append("\t").append(SolrConstants.USERNAME) Modified: nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java?rev=1728313&r1=1728312&r2=1728313&view=diff ============================================================================== --- nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java (original) +++ nutch/trunk/src/plugin/indexer-solr/src/java/org/apache/nutch/indexwriter/solr/SolrUtils.java Wed Feb 3 13:51:10 2016 @@ -16,20 +16,15 @@ */ package org.apache.nutch.indexwriter.solr; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; + +import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.mapred.JobConf; -import org.apache.solr.client.solrj.impl.CloudSolrServer; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer; -import org.apache.solr.client.solrj.impl.HttpSolrServer; -import org.apache.solr.client.solrj.impl.LBHttpSolrServer; -import org.apache.solr.client.solrj.SolrServer; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; import java.net.MalformedURLException; @@ -37,64 +32,45 @@ public class SolrUtils { public static Logger LOG = LoggerFactory.getLogger(SolrUtils.class); - private static SolrServer server; - - public static SolrServer getSolrServer(JobConf job) - throws MalformedURLException { - - boolean auth = job.getBoolean(SolrConstants.USE_AUTH, false); - - CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - // Check for username/password - if (auth) { - String username = job.get(SolrConstants.USERNAME); - LOG.info("Authenticating as: " + username); - AuthScope scope = new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, - AuthScope.ANY_REALM, AuthScope.ANY_SCHEME); - credentialsProvider.setCredentials(scope, - new UsernamePasswordCredentials(username, job.get(SolrConstants.PASSWORD))); - } - CloseableHttpClient client = - HttpClientBuilder.create().setDefaultCredentialsProvider(credentialsProvider).build(); - - String solrServer = job.get(SolrConstants.SERVER_TYPE, "http"); - String zkHost = job.get(SolrConstants.ZOOKEEPER_URL, null); - String solrServerUrl = job.get(SolrConstants.SERVER_URL); - - switch (solrServer) { - case "cloud": - server = new CloudSolrServer(zkHost); - LOG.debug("CloudSolrServer selected as indexing server."); - break; - case "concurrent": - server = new ConcurrentUpdateSolrServer(solrServerUrl, client, 1000, 10); - LOG.debug("ConcurrentUpdateSolrServer selected as indexing server."); - break; - case "http": - if (auth) { - server = new HttpSolrServer(solrServerUrl, client); - } else { - server = new HttpSolrServer(solrServerUrl); + /** + * + * + * @param JobConf + * @return SolrClient + */ + public static ArrayList<SolrClient> getSolrClients(JobConf job) throws MalformedURLException { + String[] urls = job.getStrings(SolrConstants.SERVER_URL); + String[] zkHostString = job.getStrings(SolrConstants.ZOOKEEPER_HOSTS); + ArrayList<SolrClient> solrClients = new ArrayList<SolrClient>(); + + if (zkHostString != null && zkHostString.length > 0) { + for (int i = 0; i < zkHostString.length; i++) { + CloudSolrClient sc = getCloudSolrClient(zkHostString[i]); + sc.setDefaultCollection(job.get(SolrConstants.COLLECTION)); + solrClients.add(sc); } - LOG.debug("HttpSolrServer selected as indexing server."); - break; - case "lb": - String[] lbServerString = job.get(SolrConstants.LOADBALANCE_URLS).split(","); - server = new LBHttpSolrServer(client, lbServerString); - LOG.debug("LBHttpSolrServer selected as indexing server."); - break; - default: - if (auth) { - server = new HttpSolrServer(solrServerUrl, client); - } else { - server = new HttpSolrServer(solrServerUrl); + } else { + for (int i = 0; i < urls.length; i++) { + SolrClient sc = new HttpSolrClient(urls[i]); + solrClients.add(sc); } - LOG.debug("HttpSolrServer selected as indexing server."); - break; } - return server; + + return solrClients; + } + + public static CloudSolrClient getCloudSolrClient(String url) throws MalformedURLException { + CloudSolrClient sc = new CloudSolrClient(url.replace('|', ',')); + sc.setParallelUpdates(true); + sc.connect(); + return sc; } + public static SolrClient getHttpSolrClient(String url) throws MalformedURLException { + SolrClient sc =new HttpSolrClient(url); + return sc; + } + public static String stripNonCharCodepoints(String input) { StringBuilder retval = new StringBuilder(); char ch; @@ -117,4 +93,5 @@ public class SolrUtils { return retval.toString(); } -} \ No newline at end of file + +}