WAVE-311 Applies full text search with Solr patch by Frank Ren https://reviews.apache.org/r/16322 but without SolrRobot.
Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/da20b429 Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/da20b429 Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/da20b429 Branch: refs/heads/master Commit: da20b429ad3356b9b4b610173dca3cc38de03aed Parents: 20ae1d8 Author: Frank R <[email protected]> Authored: Fri Apr 11 19:55:00 2014 +0300 Committer: Yuri Zelikov <[email protected]> Committed: Wed Aug 27 19:17:55 2014 +0300 ---------------------------------------------------------------------- run-export.sh | 2 +- run-import.sh | 2 +- server.config.example | 4 +- src/com/google/wave/api/Annotation.java | 1 + .../waveprotocol/box/server/SearchModule.java | 13 +- src/org/waveprotocol/box/server/ServerMain.java | 4 +- .../robots/agent/welcome/WelcomeRobot.java | 2 +- .../waveserver/SimpleSearchProviderImpl.java | 13 +- .../waveserver/SolrSearchProviderImpl.java | 298 +++++++++++ .../server/waveserver/SolrWaveIndexerImpl.java | 498 +++++++++++++++++++ .../box/server/waveserver/WaveDigester.java | 85 ++-- 11 files changed, 869 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/run-export.sh ---------------------------------------------------------------------- diff --git a/run-export.sh b/run-export.sh index b74e376..3d4b8e6 100644 --- a/run-export.sh +++ b/run-export.sh @@ -22,4 +22,4 @@ # The version of Wave in a Box, extracted from the build.properties file WAVEINABOX_VERSION=`sed "s/[\\t ]*=[\\t ]*/=/g" build.properties | grep ^waveinabox.version= | cut -f2 -d=` -exec java -cp dist/waveinabox-export-import-$WAVEINABOX_VERSION.jar org.waveprotocol.box.expimp.WaveExport $* +exec java -cp dist/wave-in-a-box-export-import-$WAVEINABOX_VERSION.jar org.waveprotocol.box.expimp.WaveExport $* http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/run-import.sh ---------------------------------------------------------------------- diff --git a/run-import.sh b/run-import.sh index f338cd4..f21d040 100644 --- a/run-import.sh +++ b/run-import.sh @@ -22,4 +22,4 @@ # The version of Wave in a Box, extracted from the build.properties file WAVEINABOX_VERSION=`sed "s/[\\t ]*=[\\t ]*/=/g" build.properties | grep ^waveinabox.version= | cut -f2 -d=` -exec java -cp dist/waveinabox-export-import-$WAVEINABOX_VERSION.jar org.waveprotocol.box.expimp.WaveImport $* +exec java -cp dist/wave-in-a-box-export-import-$WAVEINABOX_VERSION.jar org.waveprotocol.box.expimp.WaveImport $* http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/server.config.example ---------------------------------------------------------------------- diff --git a/server.config.example b/server.config.example index f04b19f..ae515ab 100644 --- a/server.config.example +++ b/server.config.example @@ -22,7 +22,7 @@ # to override wave_server_domain run: ant -f server-config.xml -Dwave_server_domain=example.com -# Domain name of the wave server +# Domain name of the wave server # Default value: local.net wave_server_domain = @WAVE_SERVER_DOMAIN@ @@ -181,7 +181,7 @@ clientauth_cert_domain = @CLIENTAUTH_CERT_DOMAIN@ # Disable login page to force x509-only authentication disable_loginpage = @DISABLE_LOGINPAGE@ -# Currently supported search types: memory, lucene. +# Currently supported search types: memory, lucene, solr. # Default value: lucene. search_type = @SEARCH_TYPE@ http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/com/google/wave/api/Annotation.java ---------------------------------------------------------------------- diff --git a/src/com/google/wave/api/Annotation.java b/src/com/google/wave/api/Annotation.java index b55f778..5b137b2 100644 --- a/src/com/google/wave/api/Annotation.java +++ b/src/com/google/wave/api/Annotation.java @@ -43,6 +43,7 @@ public class Annotation implements Serializable { public static final String TEXT_DECORATION = "style/textDecoration"; public static final String VERTICAL_ALIGN = "style/verticalAlign"; public static final String LINK = "link/manual"; + public static final String WAVE_LINK = "link/wave"; /** The annotation name. */ private final String name; http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/SearchModule.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/SearchModule.java b/src/org/waveprotocol/box/server/SearchModule.java index 2de0ef9..9848a7f 100644 --- a/src/org/waveprotocol/box/server/SearchModule.java +++ b/src/org/waveprotocol/box/server/SearchModule.java @@ -37,6 +37,8 @@ import org.waveprotocol.box.server.waveserver.PerUserWaveViewHandler; import org.waveprotocol.box.server.waveserver.PerUserWaveViewProvider; import org.waveprotocol.box.server.waveserver.SearchProvider; import org.waveprotocol.box.server.waveserver.SimpleSearchProviderImpl; +import org.waveprotocol.box.server.waveserver.SolrSearchProviderImpl; +import org.waveprotocol.box.server.waveserver.SolrWaveIndexerImpl; import org.waveprotocol.box.server.waveserver.WaveIndexer; /** @@ -56,8 +58,8 @@ public class SearchModule extends AbstractModule { @Override public void configure() { - bind(SearchProvider.class).to(SimpleSearchProviderImpl.class).in(Singleton.class); if ("lucene".equals(searchType)) { + bind(SearchProvider.class).to(SimpleSearchProviderImpl.class).in(Singleton.class); bind(PerUserWaveViewProvider.class).to(LucenePerUserWaveViewHandlerImpl.class).in( Singleton.class); bind(PerUserWaveViewBus.Listener.class).to(LucenePerUserWaveViewHandlerImpl.class).in( @@ -70,7 +72,16 @@ public class SearchModule extends AbstractModule { } else { bind(WaveIndexer.class).to(NoOpWaveIndexerImpl.class); } + } else if ("solr".equals(searchType)) { + bind(SearchProvider.class).to(SolrSearchProviderImpl.class).in(Singleton.class); + /*- + * (Frank R.) binds to class with dummy methods just because it's required by + * org.waveprotocol.box.server.ServerMain.initializeSearch(Injector, WaveBus) + */ + bind(PerUserWaveViewBus.Listener.class).to(SolrWaveIndexerImpl.class).in(Singleton.class); + bind(WaveIndexer.class).to(SolrWaveIndexerImpl.class).in(Singleton.class); } else if ("memory".equals(searchType)) { + bind(SearchProvider.class).to(SimpleSearchProviderImpl.class).in(Singleton.class); bind(PerUserWaveViewProvider.class).to(MemoryPerUserWaveViewHandlerImpl.class).in( Singleton.class); bind(PerUserWaveViewBus.Listener.class).to(MemoryPerUserWaveViewHandlerImpl.class).in( http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/ServerMain.java b/src/org/waveprotocol/box/server/ServerMain.java index a06db12..5ae8779 100644 --- a/src/org/waveprotocol/box/server/ServerMain.java +++ b/src/org/waveprotocol/box/server/ServerMain.java @@ -54,11 +54,13 @@ import org.waveprotocol.box.server.robots.agent.welcome.WelcomeRobot; import org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet; import org.waveprotocol.box.server.robots.dataapi.DataApiServlet; import org.waveprotocol.box.server.robots.passive.RobotsGateway; +import org.waveprotocol.box.server.rpc.AttachmentInfoServlet; import org.waveprotocol.box.server.rpc.AttachmentServlet; import org.waveprotocol.box.server.rpc.AuthenticationServlet; import org.waveprotocol.box.server.rpc.FetchProfilesServlet; import org.waveprotocol.box.server.rpc.FetchServlet; import org.waveprotocol.box.server.rpc.GadgetProviderServlet; +import org.waveprotocol.box.server.rpc.LocaleServlet; import org.waveprotocol.box.server.rpc.NotificationServlet; import org.waveprotocol.box.server.rpc.SearchServlet; import org.waveprotocol.box.server.rpc.ServerRpcProvider; @@ -73,8 +75,6 @@ import org.waveprotocol.box.server.waveserver.WaveIndexer; import org.waveprotocol.box.server.waveserver.WaveServerException; import org.waveprotocol.box.server.waveserver.WaveletProvider; import org.waveprotocol.box.server.waveserver.WaveletStateException; -import org.waveprotocol.box.server.rpc.AttachmentInfoServlet; -import org.waveprotocol.box.server.rpc.AttachmentServlet; import org.waveprotocol.wave.crypto.CertPathStore; import org.waveprotocol.wave.federation.FederationSettings; import org.waveprotocol.wave.federation.FederationTransport; http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/robots/agent/welcome/WelcomeRobot.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/robots/agent/welcome/WelcomeRobot.java b/src/org/waveprotocol/box/server/robots/agent/welcome/WelcomeRobot.java index 2735940..ba041cd 100755 --- a/src/org/waveprotocol/box/server/robots/agent/welcome/WelcomeRobot.java +++ b/src/org/waveprotocol/box/server/robots/agent/welcome/WelcomeRobot.java @@ -56,7 +56,7 @@ import java.util.logging.Logger; public class WelcomeRobot extends AbstractBaseRobotAgent { - private static final Logger LOG = Logger.getLogger(PasswordRobot.class.getName()); + private static final Logger LOG = Logger.getLogger(WelcomeRobot.class.getName()); public static final String ROBOT_URI = AGENT_PREFIX_URI + "/welcome"; /** The id of the wave that serves as a template for the welcome wave. */ http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImpl.java b/src/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImpl.java index 3c69816..ea75231 100644 --- a/src/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImpl.java +++ b/src/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImpl.java @@ -64,8 +64,7 @@ public class SimpleSearchProviderImpl extends AbstractSearchProviderImpl { } @Override - public SearchResult search(final ParticipantId user, String query, int startAt, - int numResults) { + public SearchResult search(final ParticipantId user, String query, int startAt, int numResults) { LOG.fine("Search query '" + query + "' from user: " + user + " [" + startAt + ", " + (startAt + numResults - 1) + "]"); Map<TokenQueryType, Set<String>> queryParams = null; @@ -133,9 +132,9 @@ public class SimpleSearchProviderImpl extends AbstractSearchProviderImpl { return currentUserWavesView; } - private Function<ReadableWaveletData, Boolean> createFilterWaveletsFunction(final ParticipantId user, - final boolean isAllQuery, final List<ParticipantId> withParticipantIds, - final List<ParticipantId> creatorParticipantIds) { + private Function<ReadableWaveletData, Boolean> createFilterWaveletsFunction( + final ParticipantId user, final boolean isAllQuery, + final List<ParticipantId> withParticipantIds, final List<ParticipantId> creatorParticipantIds) { // A function to be applied by the WaveletContainer. Function<ReadableWaveletData, Boolean> matchesFunction = new Function<ReadableWaveletData, Boolean>() { @@ -143,8 +142,8 @@ public class SimpleSearchProviderImpl extends AbstractSearchProviderImpl { @Override public Boolean apply(ReadableWaveletData wavelet) { try { - return isWaveletMatchesCriteria(wavelet, user, sharedDomainParticipantId, withParticipantIds, - creatorParticipantIds, isAllQuery); + return isWaveletMatchesCriteria(wavelet, user, sharedDomainParticipantId, + withParticipantIds, creatorParticipantIds, isAllQuery); } catch (WaveletStateException e) { LOG.warning( "Failed to access wavelet " http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/waveserver/SolrSearchProviderImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/SolrSearchProviderImpl.java b/src/org/waveprotocol/box/server/waveserver/SolrSearchProviderImpl.java new file mode 100644 index 0000000..607a295 --- /dev/null +++ b/src/org/waveprotocol/box/server/waveserver/SolrSearchProviderImpl.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.waveserver; + +import com.google.common.base.Function; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.inject.Inject; +import com.google.inject.name.Named; +import com.google.wave.api.SearchResult; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.http.HttpStatus; +import org.waveprotocol.box.server.CoreSettings; +import org.waveprotocol.wave.model.id.WaveId; +import org.waveprotocol.wave.model.id.WaveletId; +import org.waveprotocol.wave.model.wave.ParticipantId; +import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; +import org.waveprotocol.wave.model.wave.data.WaveViewData; +import org.waveprotocol.wave.util.logging.Log; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Search provider that offers full text search + * + * @author Frank R. <[email protected]> + */ +public class SolrSearchProviderImpl extends SimpleSearchProviderImpl implements SearchProvider { + + private static final Log LOG = Log.get(SolrSearchProviderImpl.class); + + private static final String WORD_START = "(\\b|^)"; + private static final Pattern IN_PATTERN = Pattern.compile("\\bin:\\S*"); + + public static final int ROWS = 10; + + public static final String ID = "id"; + public static final String WAVE_ID = "waveId_s"; + public static final String WAVELET_ID = "waveletId_s"; + public static final String DOC_NAME = "docName_s"; + public static final String LMT = "lmt_l"; + public static final String WITH = "with_ss"; + public static final String WITH_FUZZY = "with_txt"; + public static final String CREATOR = "creator_t"; + public static final String TEXT = "text_t"; + public static final String IN = "in_ss"; + + /* + * TODO (Frank R.) make it configurable + */ + public static final String SOLR_BASE_URL = "http://localhost:8983/solr"; + + /*- + * http://wiki.apache.org/solr/CommonQueryParameters#q + * + * (regression alert) the commented enables empty wave to be listed in search results + */ + public static final String Q = WAVE_ID + ":[* TO *]" // + + " AND " + WAVELET_ID + ":[* TO *]" // + + " AND " + DOC_NAME + ":[* TO *]" // + + " AND " + LMT + ":[* TO *]" // + + " AND " + WITH + ":[* TO *]" // + + " AND " + WITH_FUZZY + ":[* TO *]" // + + " AND " + CREATOR + ":[* TO *]" // + /* + " AND " + TEXT + ":[* TO *]" */; + + /*- + * XXX (Frank R.) (experimental and disabled) edismax query parser + * + * mm (Minimum 'Should' Match) + * http://wiki.apache.org/solr/ExtendedDisMax#mm_.28Minimum_.27Should.27_Match.29 + * + * !edismax ignores "q.op=AND", see + * + * ExtendedDismaxQParser (edismax) does not obey q.op for queries with operators + * https://issues.apache.org/jira/browse/SOLR-3741 + * + * ExtendedDismaxQParser (edismax) does not obey q.op for parenthesized sub-queries + * https://issues.apache.org/jira/browse/SOLR-3740 + */ + // public static final String FILTER_QUERY_PREFIX = "{!edismax q.op=AND df=" + + // TEXT + "}" // + // + WITH + ":"; + private static final String FILTER_QUERY_PREFIX = "{!lucene q.op=AND df=" + TEXT + "}" // + + WITH + ":"; + + + public static Function<ReadableWaveletData, Boolean> matchesFunction = + new Function<ReadableWaveletData, Boolean>() { + + @Override + public Boolean apply(ReadableWaveletData wavelet) { + return true; + } + }; + + public static String buildUserQuery(String query) { + return query.replaceAll(WORD_START + TokenQueryType.IN.getToken() + ":", IN + ":") + .replaceAll(WORD_START + TokenQueryType.WITH.getToken() + ":", WITH_FUZZY + ":") + .replaceAll(WORD_START + TokenQueryType.CREATOR.getToken() + ":", CREATOR + ":"); + } + + @Inject + public SolrSearchProviderImpl(WaveDigester digester, WaveMap waveMap, + @Named(CoreSettings.WAVE_SERVER_DOMAIN) String waveDomain) { + super(waveDomain, digester, waveMap, null); + } + + @Override + public SearchResult search(final ParticipantId user, String query, int startAt, int numResults) { + LOG.fine("Search query '" + query + "' from user: " + user + " [" + startAt + ", " + + (startAt + numResults - 1) + "]"); + + /*- + * see + * org.waveprotocol.box.server.waveserver.SimpleSearchProviderImpl.search(ParticipantId, String, int, int).isAllQuery + */ + // Maybe should be changed in case other folders in addition to 'inbox' are + // added. + final boolean isAllQuery = isAllQuery(query); + + Multimap<WaveId, WaveletId> currentUserWavesView = HashMultimap.create(); + + if (numResults > 0) { + + int start = startAt; + int rows = Math.max(numResults, ROWS); + + /*- + * "fq" stands for Filter Query. see + * http://wiki.apache.org/solr/CommonQueryParameters#fq + */ + String fq = buildFilterQuery(query, isAllQuery, user.getAddress(), sharedDomainParticipantId); + + GetMethod getMethod = new GetMethod(); + try { + while (true) { + getMethod.setURI(new URI(SOLR_BASE_URL + "/select?wt=json" + "&start=" + start + "&rows=" + + rows + "&q=" + Q + "&fq=" + fq, false)); + + HttpClient httpClient = new HttpClient(); + int statusCode = httpClient.executeMethod(getMethod); + if (statusCode != HttpStatus.SC_OK) { + LOG.warning("Failed to execute query: " + query); + return digester.generateSearchResult(user, query, null); + } + + JsonObject json = + new JsonParser().parse(new InputStreamReader(getMethod.getResponseBodyAsStream())) + .getAsJsonObject(); + JsonObject responseJson = json.getAsJsonObject("response"); + JsonArray docsJson = responseJson.getAsJsonArray("docs"); + if (docsJson.size() == 0) { + break; + } + + Iterator<JsonElement> docJsonIterator = docsJson.iterator(); + while (docJsonIterator.hasNext()) { + JsonObject docJson = docJsonIterator.next().getAsJsonObject(); + + /* + * TODO (Frank R.) c.f. + * org.waveprotocol.box.server.waveserver.SimpleSearchProviderImpl + * .isWaveletMatchesCriteria(ReadableWaveletData, ParticipantId, + * ParticipantId, List<ParticipantId>, List<ParticipantId>, boolean) + */ + + WaveId waveId = WaveId.deserialise(docJson.getAsJsonPrimitive(WAVE_ID).getAsString()); + WaveletId waveletId = + WaveletId.deserialise(docJson.getAsJsonPrimitive(WAVELET_ID).getAsString()); + currentUserWavesView.put(waveId, waveletId); + + /*- + * XXX (Frank R.) (experimental and disabled) reduce round trips to solr + * + * the result list will be filtered. so we need all results + */ + // if (currentUserWavesView.size() >= numResults) { + // break; + // } + } + + /*- + * XXX (Frank R.) (experimental and disabled) reduce round trips to solr + * + * the result list will be filtered. so we need all results + */ + // if (currentUserWavesView.size() >= numResults) { + // break; + // } + + /* + * there won't be any more results - stop querying next page of + * results + */ + if (docsJson.size() < rows) { + break; + } + + start += rows; + } + + } catch (IOException e) { + LOG.warning("Failed to execute query: " + query); + return digester.generateSearchResult(user, query, null); + } finally { + getMethod.releaseConnection(); + } + } + + Map<WaveId, WaveViewData> results = + filterWavesViewBySearchCriteria(matchesFunction, currentUserWavesView); + if (LOG.isFineLoggable()) { + for (Map.Entry<WaveId, WaveViewData> e : results.entrySet()) { + LOG.fine("filtered results contains: " + e.getKey()); + } + } + + Collection<WaveViewData> searchResult = computeSearchResult(user, startAt, numResults, results); + LOG.info("Search response to '" + query + "': " + searchResult.size() + " results, user: " + + user); + return digester.generateSearchResult(user, query, searchResult); + } + + public static boolean isAllQuery(String query) { + return !IN_PATTERN.matcher(query).find(); + } + + public static String buildFilterQuery(String query, final boolean isAllQuery, + String addressOfRequiredParticipant, ParticipantId sharedDomainParticipantId) { + + String fq; + if (isAllQuery) { + fq = + FILTER_QUERY_PREFIX + "(" + addressOfRequiredParticipant + " OR " + + sharedDomainParticipantId + ")"; + } else { + fq = FILTER_QUERY_PREFIX + addressOfRequiredParticipant; + } + if (query.length() > 0) { + fq += " AND (" + buildUserQuery(query) + ")"; + } + + return fq; + } + + /*- + * copied with modification from + * org.waveprotocol.box.server.waveserver.SimpleSearchProviderImpl.computeSearchResult(ParticipantId, int, int, Map<TokenQueryType, Set<String>>, Map<WaveId, WaveViewData>) + * + * removed queryParams + */ + private Collection<WaveViewData> computeSearchResult(final ParticipantId user, int startAt, + int numResults, Map<WaveId, WaveViewData> results) { + List<WaveViewData> searchResultslist = null; + int searchResultSize = results.values().size(); + // Check if we have enough results to return. + if (searchResultSize < startAt) { + searchResultslist = Collections.emptyList(); + } else { + int endAt = Math.min(startAt + numResults, searchResultSize); + searchResultslist = new ArrayList<WaveViewData>(results.values()).subList(startAt, endAt); + } + return searchResultslist; + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/waveserver/SolrWaveIndexerImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/SolrWaveIndexerImpl.java b/src/org/waveprotocol/box/server/waveserver/SolrWaveIndexerImpl.java new file mode 100644 index 0000000..0b57272 --- /dev/null +++ b/src/org/waveprotocol/box/server/waveserver/SolrWaveIndexerImpl.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.waveserver; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.RequestEntity; +import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.apache.http.HttpStatus; +import org.waveprotocol.box.common.DeltaSequence; +import org.waveprotocol.box.common.DocumentConstants; +import org.waveprotocol.box.server.robots.util.ConversationUtil; +import org.waveprotocol.wave.model.conversation.ObservableConversation; +import org.waveprotocol.wave.model.conversation.ObservableConversationBlip; +import org.waveprotocol.wave.model.conversation.ObservableConversationView; +import org.waveprotocol.wave.model.conversation.TitleHelper; +import org.waveprotocol.wave.model.conversation.WaveletBasedConversation; +import org.waveprotocol.wave.model.document.Document; +import org.waveprotocol.wave.model.document.operation.AnnotationBoundaryMap; +import org.waveprotocol.wave.model.document.operation.Attributes; +import org.waveprotocol.wave.model.document.operation.AttributesUpdate; +import org.waveprotocol.wave.model.document.operation.DocOp; +import org.waveprotocol.wave.model.document.operation.DocOpCursor; +import org.waveprotocol.wave.model.document.operation.impl.InitializationCursorAdapter; +import org.waveprotocol.wave.model.id.IdUtil; +import org.waveprotocol.wave.model.id.WaveId; +import org.waveprotocol.wave.model.id.WaveletId; +import org.waveprotocol.wave.model.id.WaveletName; +import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.ParticipantId; +import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; +import org.waveprotocol.wave.model.wave.data.ReadableBlipData; +import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; +import org.waveprotocol.wave.model.wave.data.impl.WaveViewDataImpl; +import org.waveprotocol.wave.model.wave.opbased.OpBasedWavelet; +import org.waveprotocol.wave.model.waveref.WaveRef; +import org.waveprotocol.wave.util.escapers.jvm.JavaWaverefEncoder; +import org.waveprotocol.wave.util.logging.Log; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; + +/** + * @author Frank R. <[email protected]> + */ +@Singleton +public class SolrWaveIndexerImpl extends AbstractWaveIndexer implements WaveBus.Subscriber, + PerUserWaveViewBus.Listener { + + private static final Log LOG = Log.get(SolrWaveIndexerImpl.class); + + // TODO (Yuri Z.): Inject executor. + private static final Executor executor = Executors.newSingleThreadExecutor(); + + /* + * (regression alert) for getting the title of a wave + */ + // private final ConversationUtil conversationUtil; + + private final ReadableWaveletDataProvider waveletDataProvider; + + /*- + * copied with modifications from + * org.waveprotocol.box.common.Snippets.collateTextForOps(Iterable<DocOp>) + * + * replaced white space character with new line + */ + /** + * Concatenates all of the text of the specified docops into a single String. + * + * @param documentops the document operations to concatenate. + * @return A String containing the characters from the operations. + */ + public static String readText(ReadableBlipData doc) { + + final StringBuilder resultBuilder = new StringBuilder(); + + DocOp docOp = doc.getContent().asOperation(); + docOp.apply(InitializationCursorAdapter.adapt(new DocOpCursor() { + @Override + public void characters(String s) { + resultBuilder.append(s); + } + + @Override + public void annotationBoundary(AnnotationBoundaryMap map) { + } + + @Override + public void elementStart(String type, Attributes attrs) { + if (type.equals(DocumentConstants.LINE)) { + resultBuilder.append("\n"); + } + } + + @Override + public void elementEnd() { + } + + @Override + public void retain(int itemCount) { + } + + @Override + public void deleteCharacters(String chars) { + } + + @Override + public void deleteElementStart(String type, Attributes attrs) { + } + + @Override + public void deleteElementEnd() { + } + + @Override + public void replaceAttributes(Attributes oldAttrs, Attributes newAttrs) { + } + + @Override + public void updateAttributes(AttributesUpdate attrUpdate) { + } + })); + + return resultBuilder.toString(); + } + + @Inject + public SolrWaveIndexerImpl(WaveMap waveMap, WaveletProvider waveletProvider, + ReadableWaveletDataProvider waveletDataProvider, ConversationUtil conversationUtil, + WaveletNotificationDispatcher notificationDispatcher) { + + super(waveMap, waveletProvider); + + this.waveletDataProvider = waveletDataProvider; + + /* + * (regression alert) for getting the title of a wave + */ + // this.conversationUtil = conversationUtil; + + notificationDispatcher.subscribe(this); + + return; + } + + @Override + public ListenableFuture<Void> onParticipantAdded(final WaveletName waveletName, + ParticipantId participant) { + /* + * ignored. See waveletCommitted(WaveletName, HashedVersion) + */ + return null; + } + + @Override + public ListenableFuture<Void> onParticipantRemoved(final WaveletName waveletName, + ParticipantId participant) { + /* + * ignored. See waveletCommitted(WaveletName, HashedVersion) + */ + return null; + } + + @Override + public ListenableFuture<Void> onWaveInit(final WaveletName waveletName) { + + ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() { + + @Override + public Void call() throws Exception { + ReadableWaveletData waveletData; + try { + waveletData = waveletDataProvider.getReadableWaveletData(waveletName); + updateIndex(waveletData); + } catch (WaveServerException e) { + LOG.log(Level.SEVERE, "Failed to initialize index for " + waveletName, e); + throw e; + } + return null; + } + }); + executor.execute(task); + return task; + } + + @Override + protected void processWavelet(WaveletName waveletName) { + onWaveInit(waveletName); + } + + @Override + protected void postIndexHook() { + try { + getWaveMap().unloadAllWavelets(); + } catch (WaveletStateException e) { + throw new IndexException("Problem encountered while cleaning up", e); + } + } + + private void updateIndex(ReadableWaveletData wavelet) throws IndexException { + + Preconditions.checkNotNull(wavelet); + + if (!IdUtil.isConversationRootWaveletId(wavelet.getWaveletId())) { + return; + } + + PostMethod postMethod = + new PostMethod(SolrSearchProviderImpl.SOLR_BASE_URL + "/update/json?commit=true"); + try { + JsonArray docsJson = new JsonArray(); + + String waveId = wavelet.getWaveId().serialise(); + String waveletId = wavelet.getWaveletId().serialise(); + String modified = Long.toString(wavelet.getLastModifiedTime()); + String creator = wavelet.getCreator().getAddress(); + + /* + * (regression alert) gets wave title - too much overhead for updating + * index. on rendering search results, solr-bot builds link texts with + * WaveDigester + */ + // String title = + // getTitle(wavelet.getWaveId(), wavelet.getWaveletId(), waveMap, + // conversationUtil); + + for (String docName : wavelet.getDocumentIds()) { + ReadableBlipData document = wavelet.getDocument(docName); + + /* + * skips non-blip documents + */ + if (!IdUtil.isBlipId(docName)) { + continue; + } + + String text = readText(document); + + + /* + * (regression alert) it hangs at + * com.google.common.collect.Iterables.cycle(T...) + */ + // String text = + // Snippets + // .collateTextForOps(Iterables.cycle((DocOp) + // document.getContent().asOperation())); + + /* + * (regression alert) cannot reuse Snippets because it trims the + * content. + */ + // Iterable<DocOp> docs = Arrays.asList((DocOp) + // document.getContent().asOperation()); + // String text = Snippets.collateTextForOps(docs); + + /*- + * XXX (Frank R.) (experimental) skips invisible blips + * a newly created blip starts with (and contains only) + * a new line character, and is not treated as invisible + */ + if (text.length() == 0) { + continue; + } + + JsonArray participantsJson = new JsonArray(); + for (ParticipantId participant : wavelet.getParticipants()) { + String participantAddress = participant.toString(); + participantsJson.add(new JsonPrimitive(participantAddress)); + } + + /* + * id will be something like waveId + "/~/conv+root/" + docName + */ + String id = + JavaWaverefEncoder.encodeToUriPathSegment(WaveRef.of(wavelet.getWaveId(), + wavelet.getWaveletId(), docName)); + + JsonObject docJson = new JsonObject(); + docJson.addProperty(SolrSearchProviderImpl.ID, id); + docJson.addProperty(SolrSearchProviderImpl.WAVE_ID, waveId); + docJson.addProperty(SolrSearchProviderImpl.WAVELET_ID, waveletId); + docJson.addProperty(SolrSearchProviderImpl.DOC_NAME, docName); + docJson.addProperty(SolrSearchProviderImpl.LMT, modified); + docJson.add(SolrSearchProviderImpl.WITH, participantsJson); + docJson.add(SolrSearchProviderImpl.WITH_FUZZY, participantsJson); + docJson.addProperty(SolrSearchProviderImpl.CREATOR, creator); + docJson.addProperty(SolrSearchProviderImpl.TEXT, text); + docJson.addProperty(SolrSearchProviderImpl.IN, "inbox"); + + docsJson.add(docJson); + } + + RequestEntity requestEntity = + new StringRequestEntity(docsJson.toString(), "application/json", "UTF-8"); + postMethod.setRequestEntity(requestEntity); + + HttpClient httpClient = new HttpClient(); + int statusCode = httpClient.executeMethod(postMethod); + if (statusCode != HttpStatus.SC_OK) { + throw new IndexException(waveId); + } + + } catch (IOException e) { + throw new IndexException(String.valueOf(wavelet.getWaveletId()), e); + } finally { + postMethod.releaseConnection(); + } + + return; + } + + /* + * TODO (Frank R.) move to TitleHelper. Currently, WaveletContainer is of + * default visibility, which prevents the refactoring. + */ + public static String getTitle(WaveId waveId, WaveletId waveletId, WaveMap waveMap, + ConversationUtil conversationUtil) { + + String title = null; + + WaveViewDataImpl wave = WaveViewDataImpl.create(waveId); + + WaveletContainer waveletContainer = null; + WaveletName waveletname = WaveletName.of(waveId, waveletId); + + /*- + * copied from + * + org.waveprotocol.box.server.waveserver.SimpleSearchProviderImpl.filterWavesViewBySearchCriteria(Function<ReadableWaveletData, + Boolean>, Multimap<WaveId, WaveletId>) + */ + // TODO (alown): Find some way to use isLocalWavelet to do this properly! + try { + if (LOG.isFineLoggable()) { + LOG.fine("Trying as a remote wavelet"); + } + waveletContainer = waveMap.getRemoteWavelet(waveletname); + } catch (WaveletStateException e) { + LOG.severe(String.format("Failed to get remote wavelet %s", waveletname.toString()), e); + } catch (NullPointerException e) { + // This is a fairly normal case of it being a local-only wave. + // Yet this only seems to appear in the test suite. + // Continuing is completely harmless here. + LOG.info(String.format("%s is definitely not a remote wavelet. (Null key)", + waveletname.toString()), e); + } + + if (waveletContainer == null) { + try { + if (LOG.isFineLoggable()) { + LOG.fine("Trying as a local wavelet"); + } + waveletContainer = waveMap.getLocalWavelet(waveletname); + } catch (WaveletStateException e) { + LOG.severe(String.format("Failed to get local wavelet %s", waveletname.toString()), e); + } + } + + try { + wave.addWavelet(waveletContainer.copyWaveletData()); + for (ObservableWaveletData waveletData : wave.getWavelets()) { + OpBasedWavelet wavelet = OpBasedWavelet.createReadOnly(waveletData); + if (WaveletBasedConversation.waveletHasConversation(wavelet)) { + ObservableConversationView conversations = conversationUtil.buildConversation(wavelet); + ObservableConversation root = conversations.getRoot(); + ObservableConversationBlip firstBlip = root.getRootThread().getFirstBlip(); + Document firstBlipContents = firstBlip.getContent(); + title = TitleHelper.extractTitle(firstBlipContents).trim(); + break; + } + } + } catch (WaveletStateException e) { + LOG.warning("Failed to access wavelet " + waveletContainer.getWaveletName(), e); + } + + return title; + } + + @Override + public void waveletUpdate(final ReadableWaveletData wavelet, DeltaSequence deltas) { + /* + * (regression alert) commented out for optimization, see + * waveletCommitted(WaveletName, HashedVersion) + */ + // updateIndex(wavelet); + } + + @Override + public void waveletCommitted(final WaveletName waveletName, final HashedVersion version) { + + Preconditions.checkNotNull(waveletName); + + /* + * (regression alert) don't update on current thread to prevent lock error + */ + ListenableFutureTask<Void> task = ListenableFutureTask.create(new Callable<Void>() { + + @Override + public Void call() throws Exception { + ReadableWaveletData waveletData; + try { + waveletData = waveletDataProvider.getReadableWaveletData(waveletName); + LOG.fine("commit " + version + " " + waveletData.getVersion()); + if (waveletData.getVersion() == version.getVersion()) { + updateIndex(waveletData); + } + } catch (WaveServerException e) { + LOG.log(Level.SEVERE, "Failed to update index for " + waveletName, e); + throw e; + } + return null; + } + }); + executor.execute(task); + + return; + } + + @Override + public synchronized void remakeIndex() throws WaveletStateException, WaveServerException { + + /*- + * to fully rebuild the index, need to delete everything first + * the <query> tag should contain the value of + * org.waveprotocol.box.server.waveserver.SolrSearchProviderImpl.Q + * + * http://localhost:8983/solr/update?stream.body=<delete><query>waveId_s:[*%20TO%20*]%20AND%20waveletId_s:[*%20TO%20*]%20AND%20docName_s:[*%20TO%20*]%20AND%20lmt_l:[*%20TO%20*]%20AND%20with_ss:[*%20TO%20*]%20AND%20with_txt:[*%20TO%20*]%20AND%20creator_t:[*%20TO%20*]</query></delete> + * http://localhost:8983/solr/update?stream.body=<commit/> + * + * see + * http://wiki.apache.org/solr/FAQ#How_can_I_delete_all_documents_from_my_index.3F + */ + + GetMethod getMethod = new GetMethod(); + try { + getMethod + .setURI(new URI(SolrSearchProviderImpl.SOLR_BASE_URL + "/update?wt=json" + + "&stream.body=<delete><query>" + SolrSearchProviderImpl.Q + "</query></delete>", + false)); + + HttpClient httpClient = new HttpClient(); + int statusCode = httpClient.executeMethod(getMethod); + if (statusCode == HttpStatus.SC_OK) { + getMethod.setURI(new URI(SolrSearchProviderImpl.SOLR_BASE_URL + "/update?wt=json" + + "&stream.body=<commit/>", false)); + + httpClient = new HttpClient(); + statusCode = httpClient.executeMethod(getMethod); + if (statusCode != HttpStatus.SC_OK) { + LOG.warning("failed to clean solr index"); + } + } else { + LOG.warning("failed to clean solr index"); + } + } catch (Exception e) { + LOG.warning("failed to clean solr index", e); + } finally { + getMethod.releaseConnection(); + } + + super.remakeIndex(); + + return; + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/da20b429/src/org/waveprotocol/box/server/waveserver/WaveDigester.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveDigester.java b/src/org/waveprotocol/box/server/waveserver/WaveDigester.java index b103bbb..25d19d6 100644 --- a/src/org/waveprotocol/box/server/waveserver/WaveDigester.java +++ b/src/org/waveprotocol/box/server/waveserver/WaveDigester.java @@ -42,8 +42,8 @@ import org.waveprotocol.wave.model.supplement.PrimitiveSupplement; import org.waveprotocol.wave.model.supplement.PrimitiveSupplementImpl; import org.waveprotocol.wave.model.supplement.SupplementedWave; import org.waveprotocol.wave.model.supplement.SupplementedWaveImpl; -import org.waveprotocol.wave.model.supplement.WaveletBasedSupplement; import org.waveprotocol.wave.model.supplement.SupplementedWaveImpl.DefaultFollow; +import org.waveprotocol.wave.model.supplement.WaveletBasedSupplement; import org.waveprotocol.wave.model.util.CollectionUtils; import org.waveprotocol.wave.model.wave.ParticipantId; import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; @@ -58,7 +58,7 @@ import java.util.List; /** * Generates digests for the search service. - * + * * @author [email protected] */ public class WaveDigester { @@ -85,49 +85,58 @@ public class WaveDigester { return result; } for (WaveViewData wave : results) { - // Note: the indexing infrastructure only supports single-conversation - // waves, and requires raw wavelet access for snippeting. - ObservableWaveletData root = null; - ObservableWaveletData other = null; - ObservableWaveletData udw = null; - for (ObservableWaveletData waveletData : wave.getWavelets()) { - WaveletId waveletId = waveletData.getWaveletId(); - if (IdUtil.isConversationRootWaveletId(waveletId)) { - root = waveletData; - } else if (IdUtil.isConversationalId(waveletId)) { - other = waveletData; - } else if (IdUtil.isUserDataWavelet(waveletId)) { - // assume this is the user data wavelet for the right user. - udw = waveletData; - } - } + result.addDigest(build(participant, wave)); + } - ObservableWaveletData convWavelet = root != null ? root : other; - SupplementedWave supplement = null; - ObservableConversationView conversations = null; - if (convWavelet != null) { - OpBasedWavelet wavelet = OpBasedWavelet.createReadOnly(convWavelet); - if (WaveletBasedConversation.waveletHasConversation(wavelet)) { - conversations = conversationUtil.buildConversation(wavelet); - supplement = buildSupplement(participant, conversations, udw); - } + assert result.getDigests().size() == results.size(); + return result; + } + + public Digest build(ParticipantId participant, WaveViewData wave) { + + Digest digest; + + // Note: the indexing infrastructure only supports single-conversation + // waves, and requires raw wavelet access for snippeting. + ObservableWaveletData root = null; + ObservableWaveletData other = null; + ObservableWaveletData udw = null; + for (ObservableWaveletData waveletData : wave.getWavelets()) { + WaveletId waveletId = waveletData.getWaveletId(); + if (IdUtil.isConversationRootWaveletId(waveletId)) { + root = waveletData; + } else if (IdUtil.isConversationalId(waveletId)) { + other = waveletData; + } else if (IdUtil.isUserDataWavelet(waveletId)) { + // assume this is the user data wavelet for the right user. + udw = waveletData; } - if (conversations != null) { - // This is a conversational wave. Produce a conversational digest. - result.addDigest(generateDigest(conversations, supplement, convWavelet)); - } else { - // It is unknown how to present this wave. - result.addDigest(generateEmptyorUnknownDigest(wave)); + } + + ObservableWaveletData convWavelet = root != null ? root : other; + SupplementedWave supplement = null; + ObservableConversationView conversations = null; + if (convWavelet != null) { + OpBasedWavelet wavelet = OpBasedWavelet.createReadOnly(convWavelet); + if (WaveletBasedConversation.waveletHasConversation(wavelet)) { + conversations = conversationUtil.buildConversation(wavelet); + supplement = buildSupplement(participant, conversations, udw); } } + if (conversations != null) { + // This is a conversational wave. Produce a conversational digest. + digest = generateDigest(conversations, supplement, convWavelet); + } else { + // It is unknown how to present this wave. + digest = generateEmptyorUnknownDigest(wave); + } - assert result.getDigests().size() == results.size(); - return result; + return digest; } /** * Produces a digest for a set of conversations. Never returns null. - * + * * @param conversations the conversation. * @param supplement the supplement that allows to easily perform various * queries on user related state of the wavelet. @@ -211,7 +220,7 @@ public class WaveDigester { /** * Generates an empty digest in case the wave is empty, or an unknown digest * otherwise. - * + * * @param wave the wave. * @return the generated digest. */ @@ -223,7 +232,7 @@ public class WaveDigester { /** * Builds the supplement model from a wave. Never returns null. - * + * * @param viewer the participant for which the supplement is constructed. * @param conversations conversations in the wave * @param udw the user data wavelet for the logged user.
