AMBARI-19033. Log Search: Cannot increase the number of shards per node for solr collections (oleewere)
Change-Id: I44fa2c96befa2eff3d6f2acfb608eff8ed45f021 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/195b7456 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/195b7456 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/195b7456 Branch: refs/heads/branch-dev-patch-upgrade Commit: 195b7456c0b4dc15833b502463daba7b29a51bee Parents: 337c5f2 Author: oleewere <oleew...@gmail.com> Authored: Thu Dec 1 11:34:45 2016 +0100 Committer: oleewere <oleew...@gmail.com> Committed: Thu Dec 1 11:37:16 2016 +0100 ---------------------------------------------------------------------- .../ambari/logsearch/dao/SolrCollectionDao.java | 64 ++++++++++++++++++-- 1 file changed, 59 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/195b7456/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/dao/SolrCollectionDao.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/dao/SolrCollectionDao.java b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/dao/SolrCollectionDao.java index 834ba38..da76924 100644 --- a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/dao/SolrCollectionDao.java +++ b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/dao/SolrCollectionDao.java @@ -20,8 +20,12 @@ package org.apache.ambari.logsearch.dao; import org.apache.ambari.logsearch.conf.SolrPropsConfig; import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.common.SolrException; @@ -33,6 +37,7 @@ import org.slf4j.LoggerFactory; import static org.apache.ambari.logsearch.solr.SolrConstants.CommonLogConstants.ROUTER_FIELD; import javax.inject.Named; +import javax.ws.rs.core.Response; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -45,6 +50,8 @@ class SolrCollectionDao { private static final Logger LOG = LoggerFactory.getLogger(SolrCollectionDao.class); private static final int SETUP_RETRY_SECOND = 30; + private static final String MODIFY_COLLECTION_QUERY = "/admin/collections?action=MODIFYCOLLECTION&collection=%s&%s=%d"; + private static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode"; /** * This will try to get the collections from the Solr. Ping doesn't work if @@ -189,7 +196,16 @@ class SolrCollectionDao { } } else { LOG.info("Collection " + solrPropsConfig.getCollection() + " is already there. Will check whether it has the required shards"); - Collection<String> existingShards = getShards(solrClient, solrPropsConfig); + Collection<Slice> slices = getSlices(solrClient, solrPropsConfig); + Collection<String> existingShards = getShards(slices, solrPropsConfig); + if (existingShards.size() < shardsList.size()) { + try { + updateMaximumNumberOfShardsPerCore(slices, solrPropsConfig); + } catch (Throwable t) { + returnValue = false; + LOG.error(String.format("Exception during updating collection (%s)", t)); + } + } for (String shard : shardsList) { if (!existingShards.contains(shard)) { try { @@ -216,10 +232,44 @@ class SolrCollectionDao { return returnValue; } - private Collection<String> getShards(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) { - Collection<String> list = new HashSet<>(); + private String getRandomBaseUrl(Collection<Slice> slices) { + String coreUrl = null; + if (slices != null) { + for (Slice slice : slices) { + if (!slice.getReplicas().isEmpty()) { + Replica replica = slice.getReplicas().iterator().next(); + coreUrl = replica.getStr("base_url"); + if (coreUrl != null) { + break; + } + } + } + } + return coreUrl; + } + + private void updateMaximumNumberOfShardsPerCore(Collection<Slice> slices, SolrPropsConfig solrPropsConfig) throws IOException { + String baseUrl = getRandomBaseUrl(slices); + if (baseUrl != null) { + CloseableHttpClient httpClient = HttpClientUtil.createClient(null); + HttpGet request = new HttpGet(baseUrl + String.format(MODIFY_COLLECTION_QUERY, + solrPropsConfig.getCollection(), MAX_SHARDS_PER_NODE, calculateMaxShardsPerNode(solrPropsConfig))); + HttpResponse response = httpClient.execute(request); + if (response.getStatusLine().getStatusCode() != Response.Status.OK.getStatusCode()) { + throw new IllegalStateException(String.format("Cannot update collection (%s) - increase max number of nodes per core", solrPropsConfig.getCollection())); + } + } else { + throw new IllegalStateException(String.format("Cannot get any core url for updating collection (%s)", solrPropsConfig.getCollection())); + } + } + + private Collection<Slice> getSlices(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) { ZkStateReader reader = solrClient.getZkStateReader(); - Collection<Slice> slices = reader.getClusterState().getSlices(solrPropsConfig.getCollection()); + return reader.getClusterState().getSlices(solrPropsConfig.getCollection()); + } + + private Collection<String> getShards(Collection<Slice> slices, SolrPropsConfig solrPropsConfig) { + Collection<String> list = new HashSet<>(); for (Slice slice : slices) { for (Replica replica : slice.getReplicas()) { LOG.info("colName=" + solrPropsConfig.getCollection() + ", slice.name=" + slice.getName() + ", slice.state=" + slice.getState() + @@ -245,7 +295,7 @@ class SolrCollectionDao { collectionCreateRequest.setNumShards(solrPropsConfig.getNumberOfShards()); collectionCreateRequest.setReplicationFactor(solrPropsConfig.getReplicationFactor()); collectionCreateRequest.setConfigName(solrPropsConfig.getConfigName()); - collectionCreateRequest.setMaxShardsPerNode(solrPropsConfig.getReplicationFactor() * solrPropsConfig.getNumberOfShards()); + collectionCreateRequest.setMaxShardsPerNode(calculateMaxShardsPerNode(solrPropsConfig)); CollectionAdminResponse createResponse = collectionCreateRequest.process(solrClient); if (createResponse.getStatus() != 0) { LOG.error("Error creating collection. collectionName=" + solrPropsConfig.getCollection() + ", response=" + createResponse); @@ -256,4 +306,8 @@ class SolrCollectionDao { return true; } } + + private Integer calculateMaxShardsPerNode(SolrPropsConfig solrPropsConfig) { + return solrPropsConfig.getReplicationFactor() * solrPropsConfig.getNumberOfShards(); + } }