Repository: incubator-unomi Updated Branches: refs/heads/master cd3bc9ff0 -> 2f687915a
UNOMI-87 : Rewrite the queries for the scroring plans update Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/2f687915 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/2f687915 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/2f687915 Branch: refs/heads/master Commit: 2f687915a8929af291e43ee2536ba4208e752496 Parents: cd3bc9f Author: Abdelkader Midani <amid...@apache.org> Authored: Mon Mar 20 12:09:36 2017 +0100 Committer: Abdelkader Midani <amid...@apache.org> Committed: Mon Mar 20 12:09:42 2017 +0100 ---------------------------------------------------------------------- .../ElasticSearchPersistenceServiceImpl.java | 124 ++++++++++++++----- ...g.apache.unomi.persistence.elasticsearch.cfg | 4 +- persistence-elasticsearch/pom.xml | 1 - .../persistence/spi/PersistenceService.java | 12 ++ .../PropertyConditionESQueryBuilder.java | 3 + .../conditions/PropertyConditionEvaluator.java | 3 - pom.xml | 2 +- .../services/services/SegmentServiceImpl.java | 119 +++++++++--------- 8 files changed, 169 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java index e2d1645..4e66418 100644 --- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java +++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java @@ -56,7 +56,11 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.reindex.BulkIndexByScrollResponse; +import org.elasticsearch.index.reindex.UpdateByQueryAction; +import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder; import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptException; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -92,22 +96,21 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + @SuppressWarnings("rawtypes") public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener { - private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); - public static final String NUMBER_OF_SHARDS = "number_of_shards"; public static final String NUMBER_OF_REPLICAS = "number_of_replicas"; public static final String CLUSTER_NAME = "cluster.name"; - public static final String BULK_PROCESSOR_NAME = "bulkProcessor.name"; public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests"; public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions"; public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize"; public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval"; public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy"; - + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName()); private TransportClient client; private BulkProcessor bulkProcessor; private String elasticSearchAddresses; @@ -123,7 +126,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher; private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher; - private Map<String,String> indexNames; + private Map<String, String> indexNames; private List<String> itemsMonthlyIndexed; private Map<String, String> routingByType; private Set<String> existingIndexNames = new TreeSet<String>(); @@ -135,7 +138,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, private String bulkProcessorName = "unomi-bulk"; private String bulkProcessorConcurrentRequests = "1"; private String bulkProcessorBulkActions = "1000"; - private String bulkProcessorBulkSize= "5MB"; + private String bulkProcessorBulkSize = "5MB"; private String bulkProcessorFlushInterval = "5s"; private String bulkProcessorBackoffPolicy = "exponential"; @@ -306,8 +309,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } if (!indexExists) { logger.info("{} index doesn't exist yet, creating it...", indexName); - Map<String,String> indexMappings = new HashMap<String,String>(); - indexMappings.put("_default_",mappings.get("_default_")); + Map<String, String> indexMappings = new HashMap<String, String>(); + indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (!itemsMonthlyIndexed.contains(entry.getKey()) && !indexNames.containsKey(entry.getKey())) { indexMappings.put(entry.getKey(), entry.getValue()); @@ -455,7 +458,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY); - int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); + int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos)); bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(delay, maxNumberOfRetries)); } else if (backoffPolicyStr.startsWith("exponential")) { if (!backoffPolicyStr.contains("(")) { @@ -466,7 +469,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, int paramEndPos = backoffPolicyStr.indexOf(")", paramStartPos); int paramSeparatorPos = backoffPolicyStr.indexOf(",", paramStartPos); TimeValue delay = TimeValue.parseTimeValue(backoffPolicyStr.substring(paramStartPos, paramSeparatorPos), new TimeValue(5, TimeUnit.SECONDS), BULK_PROCESSOR_BACKOFF_POLICY); - int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos+1, paramEndPos)); + int maxNumberOfRetries = Integer.parseInt(backoffPolicyStr.substring(paramSeparatorPos + 1, paramEndPos)); bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(delay, maxNumberOfRetries)); } } @@ -551,8 +554,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, if (!indexExists) { logger.info("{} index doesn't exist yet, creating it...", monthlyIndexName); - Map<String,String> indexMappings = new HashMap<String,String>(); - indexMappings.put("_default_",mappings.get("_default_")); + Map<String, String> indexMappings = new HashMap<String, String>(); + indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (itemsMonthlyIndexed.contains(entry.getKey())) { indexMappings.put(entry.getKey(), entry.getValue()); @@ -749,6 +752,60 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } @Override + public boolean updateWithQueryAndScript(final Date dateHint, final Class<?> clazz, final String[] scripts, final Map<String, Object>[] scriptParams, final Condition[] conditions) { + return new InClassLoaderExecute<Boolean>() { + protected Boolean execute(Object... args) throws Exception { + try { + String itemType = (String) clazz.getField("ITEM_TYPE").get(null); + + String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : + (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); + + for (int i = 0; i < scripts.length; i++) { + Script actualScript = new Script(ScriptType.INLINE, "painless", scripts[i], scriptParams[i]); + + client.admin().indices().prepareRefresh(index).get(); + + UpdateByQueryRequestBuilder ubqrb = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); + ubqrb.source(index).source().setTypes(itemType); + BulkIndexByScrollResponse response = ubqrb.setSlices(2) + .setMaxRetries(1000).abortOnVersionConflict(false).script(actualScript) + .filter(conditionESQueryBuilderDispatcher.buildFilter(conditions[i])).get(); + if (response.getBulkFailures().size() > 0) { + for (BulkItemResponse.Failure failure : response.getBulkFailures()) { + logger.error("Failure : cause={} , message={}", failure.getCause(), failure.getMessage()); + } + } else { + logger.info("Update By Query has processed {} in {}.", response.getUpdated(), response.getTook().toString()); + } + if (response.isTimedOut()) { + logger.error("Update By Query ended with timeout!"); + } + if (response.getVersionConflicts() > 0) { + logger.warn("Update By Query ended with {} Version Conflicts!", response.getVersionConflicts()); + } + if (response.getNoops() > 0) { + logger.warn("Update By Query ended with {} noops!", response.getNoops()); + } + } + return true; + } catch (IndexNotFoundException e) { + throw new Exception("No index found for itemType=" + clazz.getName(), e); + } catch (NoSuchFieldException e) { + throw new Exception("Error updating item ", e); + } catch (IllegalAccessException e) { + throw new Exception("Error updating item ", e); + } catch (ScriptException e) { + logger.error("Error in the update script : {}\n{}\n{}", e.getScript(), e.getDetailedMessage(), e.getScriptStack()); + throw new Exception("Error in the update script"); + } finally { + return false; + } + } + }.catchingExecuteInClassLoader(true); + } + + @Override public boolean updateWithScript(final String itemId, final Date dateHint, final Class<?> clazz, final String script, final Map<String, Object> scriptParams) { return new InClassLoaderExecute<Boolean>() { protected Boolean execute(Object... args) throws Exception { @@ -758,13 +815,15 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, String index = indexNames.containsKey(itemType) ? indexNames.get(itemType) : (itemsMonthlyIndexed.contains(itemType) && dateHint != null ? getMonthlyIndex(dateHint) : indexName); - Script actualScript = new Script(ScriptType.INLINE, "groovy", script, scriptParams); + Script actualScript = new Script(ScriptType.INLINE, "painless", script, scriptParams); + if (bulkProcessor == null) { client.prepareUpdate(index, itemType, itemId).setScript(actualScript) .execute() .actionGet(); } else { - UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript).request(); + UpdateRequest updateRequest = client.prepareUpdate(index, itemType, itemId).setScript(actualScript). + setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).request(); bulkProcessor.add(updateRequest); } return true; @@ -852,8 +911,8 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, IndicesExistsResponse indicesExistsResponse = client.admin().indices().prepareExists(indexName).execute().actionGet(); boolean indexExists = indicesExistsResponse.isExists(); if (!indexExists) { - Map<String,String> indexMappings = new HashMap<String,String>(); - indexMappings.put("_default_",mappings.get("_default_")); + Map<String, String> indexMappings = new HashMap<String, String>(); + indexMappings.put("_default_", mappings.get("_default_")); for (Map.Entry<String, String> entry : mappings.entrySet()) { if (indexNames.containsKey(entry.getKey()) && indexNames.get(entry.getKey()).equals(indexName)) { indexMappings.put(entry.getKey(), entry.getValue()); @@ -880,7 +939,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, }.catchingExecuteInClassLoader(true); } - private void internalCreateIndex(String indexName, Map<String,String> mappings) { + private void internalCreateIndex(String indexName, Map<String, String> mappings) { CreateIndexRequestBuilder builder = client.admin().indices().prepareCreate(indexName) .setSettings("{\n" + " \"index\" : {\n" + @@ -995,14 +1054,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } private String getPropertyNameWithData(String name, String itemType) { - Map<String,Object> propertyMapping = getPropertyMapping(name,itemType); + Map<String, Object> propertyMapping = getPropertyMapping(name, itemType); if (propertyMapping == null) { return null; } if (propertyMapping != null && "text".equals(propertyMapping.get("type")) && propertyMapping.containsKey("fields") - && ((Map)propertyMapping.get("fields")).containsKey("keyword")) { + && ((Map) propertyMapping.get("fields")).containsKey("keyword")) { name += ".keyword"; } return name; @@ -1107,12 +1166,12 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, @Override public <T extends Item> PartialList<T> query(String fieldName, String fieldValue, String sortBy, Class<T> clazz, int offset, int size) { - return query(QueryBuilders.termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); + return query(termQuery(fieldName, ConditionContextHelper.foldToASCII(fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); } @Override public <T extends Item> PartialList<T> queryFullText(String fieldName, String fieldValue, String fulltext, String sortBy, Class<T> clazz, int offset, int size) { - return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(QueryBuilders.termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); + return query(QueryBuilders.boolQuery().must(QueryBuilders.queryStringQuery(fulltext).defaultField("_all")).must(termQuery(fieldName, fieldValue)), sortBy, clazz, offset, size, getRouting(fieldName, new String[]{fieldValue}, clazz), null); } @Override @@ -1207,7 +1266,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, requestBuilder = requestBuilder.addSort(distanceSortBuilder.order(SortOrder.ASC)); } } else { - String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement,":"), itemType); + String name = getPropertyNameWithData(StringUtils.substringBeforeLast(sortByElement, ":"), itemType); if (name != null) { if (sortByElement.endsWith(":desc")) { requestBuilder = requestBuilder.addSort(name, SortOrder.DESC); @@ -1451,7 +1510,6 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } - @Override public void refresh() { new InClassLoaderExecute<Boolean>() { @@ -1514,7 +1572,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, new InClassLoaderExecute<Void>() { @Override protected Void execute(Object... args) { - QueryBuilder query = QueryBuilders.termQuery("scope", scope); + QueryBuilder query = termQuery("scope", scope); BulkRequestBuilder deleteByScope = client.prepareBulk(); @@ -1610,6 +1668,14 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, (itemsMonthlyIndexed.contains(itemType) ? indexName + "-*" : indexName); } + private String getConfig(Map<String, String> settings, String key, + String defaultValue) { + if (settings != null && settings.get(key) != null) { + return settings.get(key); + } + return defaultValue; + } + public abstract static class InClassLoaderExecute<T> { protected abstract T execute(Object... args) throws Exception; @@ -1624,7 +1690,7 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - public T catchingExecuteInClassLoader( boolean logError, Object... args) { + public T catchingExecuteInClassLoader(boolean logError, Object... args) { try { return executeInClassLoader(args); } catch (Exception e) { @@ -1634,13 +1700,5 @@ public class ElasticSearchPersistenceServiceImpl implements PersistenceService, } } - private String getConfig(Map<String,String> settings, String key, - String defaultValue) { - if (settings != null && settings.get(key) != null) { - return settings.get(key); - } - return defaultValue; - } - } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg index 4e52a1c..12980e1 100644 --- a/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg +++ b/persistence-elasticsearch/core/src/main/resources/org.apache.unomi.persistence.elasticsearch.cfg @@ -15,7 +15,7 @@ # limitations under the License. # -cluster.name=contextElasticSearch +cluster.name=contextElasticSearch_amidani # The elasticSearchAddresses may be a comma seperated list of host names and ports such as # hostA:9300,hostB:9300 # Note: the port number must be repeated for each host. @@ -42,4 +42,4 @@ bulkProcessor.backoffPolicy=exponential # for each node in the ElasticSearch cluster: # minimalElasticSearchVersion <= ElasticSearch node version < maximalElasticSearchVersion minimalElasticSearchVersion=5.0.0 -maximalElasticSearchVersion=5.2.0 \ No newline at end of file +maximalElasticSearchVersion=5.3.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/persistence-elasticsearch/pom.xml b/persistence-elasticsearch/pom.xml index 5d25fd5..70a85ea 100644 --- a/persistence-elasticsearch/pom.xml +++ b/persistence-elasticsearch/pom.xml @@ -32,7 +32,6 @@ <modules> <module>core</module> - <module>plugins</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java ---------------------------------------------------------------------- diff --git a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java index a6b175f..1397659 100644 --- a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java +++ b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/PersistenceService.java @@ -116,6 +116,18 @@ public interface PersistenceService { boolean updateWithScript(String itemId, Date dateHint, Class<?> clazz, String script, Map<String, Object> scriptParams); /** + * Updates the items of the specified class by a query with a new property value for the specified property name based on a provided script. + * + * @param dateHint a Date helping in identifying where the item is located + * @param clazz the Item subclass of the item to update + * @param scripts inline scripts array + * @param scriptParams script params array + * @param conditions conditions array + * @return {@code true} if the update was successful, {@code false} otherwise + */ + boolean updateWithQueryAndScript(Date dateHint, Class<?> clazz, String[] scripts, Map<String, Object>[] scriptParams, Condition[] conditions); + + /** * Retrieves the item identified with the specified identifier and with the specified Item subclass if it exists. * * @param <T> the type of the Item subclass we want to retrieve http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java ---------------------------------------------------------------------- diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java index e83bad2..b1aa234 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionESQueryBuilder.java @@ -87,6 +87,9 @@ public class PropertyConditionESQueryBuilder implements ConditionESQueryBuilder case "contains": checkRequiredValue(expectedValue, name, comparisonOperator, false); return QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*"); + case "notContains": + checkRequiredValue(expectedValue, name, comparisonOperator, false); + return QueryBuilders.boolQuery().mustNot(QueryBuilders.regexpQuery(name, ".*" + expectedValue + ".*")); case "startsWith": checkRequiredValue(expectedValue, name, comparisonOperator, false); return QueryBuilders.prefixQuery(name, expectedValue); http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java ---------------------------------------------------------------------- diff --git a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java index 1de3ae0..d7c2d2c 100644 --- a/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java +++ b/plugins/baseplugin/src/main/java/org/apache/unomi/plugins/baseplugin/conditions/PropertyConditionEvaluator.java @@ -22,7 +22,6 @@ import ognl.Ognl; import ognl.OgnlContext; import ognl.OgnlException; import ognl.enhance.ExpressionAccessor; -import org.apache.commons.beanutils.BeanUtilsBean; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.api.Event; @@ -52,8 +51,6 @@ public class PropertyConditionEvaluator implements ConditionEvaluator { private static final SimpleDateFormat yearMonthDayDateFormat = new SimpleDateFormat("yyyyMMdd"); - private BeanUtilsBean beanUtilsBean = BeanUtilsBean.getInstance(); - private Map<String, Map<String, ExpressionAccessor>> expressionCache = new HashMap<>(64); private int compare(Object actualValue, String expectedValue, Object expectedValueDate, Object expectedValueInteger, Object expectedValueDateExpr) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9a0872c..18bc4c1 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ <version.karaf>3.0.8</version.karaf> <version.karaf.cellar>3.0.3</version.karaf.cellar> <version.pax.exam>4.9.1</version.pax.exam> - <elasticsearch.version>5.1.1</elasticsearch.version> + <elasticsearch.version>5.2.2</elasticsearch.version> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/2f687915/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java ---------------------------------------------------------------------- diff --git a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java index 770936d..4367b4d 100644 --- a/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/services/SegmentServiceImpl.java @@ -147,7 +147,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } private void cancelTimers() { - if(segmentTimer != null) { + if (segmentTimer != null) { segmentTimer.cancel(); } logger.info("Segment purge: Purge unscheduled"); @@ -258,7 +258,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } private boolean checkSegmentDeletionImpact(Condition condition, String segmentToDeleteId) { - if(condition != null) { + if (condition != null) { @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); if (subConditions != null) { @@ -283,6 +283,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList * Return an updated condition that do not contain a condition on the segmentId anymore * it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned) * it's return null when there is no more condition after (if the condition passed was only a segment condition on the segmentId) + * * @param condition the condition to update * @param segmentId the segment id to remove in the condition * @return updated condition @@ -294,12 +295,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList List<Condition> updatedSubConditions = new LinkedList<>(); for (Condition subCondition : subConditions) { Condition updatedCondition = updateSegmentDependentCondition(subCondition, segmentId); - if(updatedCondition != null) { + if (updatedCondition != null) { updatedSubConditions.add(updatedCondition); } } - if(!updatedSubConditions.isEmpty()){ - if(updatedSubConditions.size() == 1) { + if (!updatedSubConditions.isEmpty()) { + if (updatedSubConditions.size() == 1) { return updatedSubConditions.get(0); } else { condition.setParameter("subConditions", updatedSubConditions); @@ -308,12 +309,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } else { return null; } - } else if("profileSegmentCondition".equals(condition.getConditionTypeId())) { + } else if ("profileSegmentCondition".equals(condition.getConditionTypeId())) { @SuppressWarnings("unchecked") final List<String> referencedSegmentIds = (List<String>) condition.getParameter("segments"); if (referencedSegmentIds.indexOf(segmentId) >= 0) { referencedSegmentIds.remove(segmentId); - if(referencedSegmentIds.isEmpty()) { + if (referencedSegmentIds.isEmpty()) { return null; } else { condition.setParameter("segments", referencedSegmentIds); @@ -379,7 +380,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList for (Segment segment : impactedSegments) { Condition updatedCondition = updateSegmentDependentCondition(segment.getCondition(), segmentId); segment.setCondition(updatedCondition); - if(updatedCondition == null) { + if (updatedCondition == null) { clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId()); segment.getMetadata().setEnabled(false); } @@ -455,7 +456,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList public SegmentsAndScores getSegmentsAndScoresForProfile(Profile profile) { Set<String> segments = new HashSet<String>(); - Map<String,Integer> scores = new HashMap<String, Integer>(); + Map<String, Integer> scores = new HashMap<String, Integer>(); List<Segment> allSegments = this.allSegments; for (Segment segment : allSegments) { @@ -542,18 +543,18 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList persistenceService.createMapping(Profile.ITEM_TYPE, String.format( "{\n" + - " \"profile\": {\n" + - " \"properties\" : {\n" + - " \"scores\": {\n" + - " \"properties\": {\n" + - " \"%s\": {\n" + - " \"type\": \"long\"\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + - "}\n", scoring.getItemId())); + " \"profile\": {\n" + + " \"properties\" : {\n" + + " \"scores\": {\n" + + " \"properties\": {\n" + + " \"%s\": {\n" + + " \"type\": \"long\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n", scoring.getItemId())); updateExistingProfilesForScoring(scoring); } @@ -571,7 +572,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } private boolean checkScoringDeletionImpact(Condition condition, String scoringToDeleteId) { - if(condition != null) { + if (condition != null) { @SuppressWarnings("unchecked") final List<Condition> subConditions = (List<Condition>) condition.getParameter("subConditions"); if (subConditions != null) { @@ -593,6 +594,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList * Return an updated condition that do not contain a condition on the scoringId anymore * it's remove the unnecessary boolean condition (if a condition is the only one of a boolean the boolean will be remove and the subcondition returned) * it's return null when there is no more condition after (if the condition passed was only a scoring condition on the scoringId) + * * @param condition the condition to update * @param scoringId the scoring id to remove in the condition * @return updated condition @@ -604,12 +606,12 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList List<Condition> updatedSubConditions = new LinkedList<>(); for (Condition subCondition : subConditions) { Condition updatedCondition = updateScoringDependentCondition(subCondition, scoringId); - if(updatedCondition != null) { + if (updatedCondition != null) { updatedSubConditions.add(updatedCondition); } } - if(!updatedSubConditions.isEmpty()){ - if(updatedSubConditions.size() == 1) { + if (!updatedSubConditions.isEmpty()) { + if (updatedSubConditions.size() == 1) { return updatedSubConditions.get(0); } else { condition.setParameter("subConditions", updatedSubConditions); @@ -671,7 +673,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList for (Segment segment : impactedSegments) { Condition updatedCondition = updateScoringDependentCondition(segment.getCondition(), scoringId); segment.setCondition(updatedCondition); - if(updatedCondition == null) { + if (updatedCondition == null) { clearAutoGeneratedRules(persistenceService.query("linkedItems", segment.getMetadata().getId(), null, Rule.class), segment.getMetadata().getId()); segment.getMetadata().setEnabled(false); } @@ -741,8 +743,8 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList Set<String> tags = condition.getConditionType().getMetadata().getTags(); if (tags.contains("eventCondition") && !tags.contains("profileCondition")) { try { - Map<String,Object> m = new HashMap<>(3); - m.put("scope",metadata.getScope()); + Map<String, Object> m = new HashMap<>(3); + m.put("scope", metadata.getScope()); m.put("condition", condition); m.put("numberOfDays", parentCondition.getParameter("numberOfDays")); String key = CustomObjectMapper.getObjectMapper().writeValueAsString(m); @@ -750,7 +752,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList parentCondition.setParameter("generatedPropertyKey", key); Rule rule = rulesService.getRule(key); if (rule == null) { - rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for "+metadata.getName(), "")); + rule = new Rule(new Metadata(metadata.getScope(), key, "Auto generated rule for " + metadata.getName(), "")); rule.setCondition(condition); rule.getMetadata().setHidden(true); final Action action = new Action(); @@ -809,10 +811,10 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList for (Map.Entry<String, Long> entry : eventCountByProfile.entrySet()) { String profileId = entry.getKey(); if (!profileId.startsWith("_")) { - Map<String,Long> pastEventCounts = new HashMap<>(); + Map<String, Long> pastEventCounts = new HashMap<>(); pastEventCounts.put(propertyKey, entry.getValue()); - Map<String,Object> systemProperties = new HashMap<>(); - systemProperties.put("pastEvents",pastEventCounts); + Map<String, Object> systemProperties = new HashMap<>(); + systemProperties.put("pastEvents", pastEventCounts); try { persistenceService.update(profileId, null, Profile.class, "systemProperties", systemProperties); } catch (Exception e) { @@ -821,7 +823,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } } - logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis()-t); + logger.info("Profiles past condition updated in {}ms", System.currentTimeMillis() - t); } private void updateExistingProfilesForSegment(Segment segment) { @@ -835,7 +837,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList segmentCondition.setParameter("comparisonOperator", "equals"); segmentCondition.setParameter("propertyValue", segment.getItemId()); - if(segment.getMetadata().isEnabled()) { + if (segment.getMetadata().isEnabled()) { ConditionType booleanConditionType = definitionsService.getConditionType("booleanCondition"); ConditionType notConditionType = definitionsService.getConditionType("notCondition"); @@ -904,7 +906,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } } } - logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis()-t); + logger.info("{} profiles updated in {}ms", updatedProfileCount, System.currentTimeMillis() - t); } private void updateExistingProfilesForScoring(Scoring scoring) { @@ -913,32 +915,31 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList scoringCondition.setConditionType(definitionsService.getConditionType("profilePropertyCondition")); scoringCondition.setParameter("propertyName", "scores." + scoring.getItemId()); scoringCondition.setParameter("comparisonOperator", "exists"); - List<Profile> previousProfiles = persistenceService.query(scoringCondition, null, Profile.class); - HashMap<String, Object> scriptParams = new HashMap<>(); - scriptParams.put("scoringId", scoring.getItemId()); + String[] scripts = new String[scoring.getElements().size() + 1]; + HashMap<String, Object>[] scriptParams = new HashMap[scoring.getElements().size() + 1]; + Condition[] conditions = new Condition[scoring.getElements().size() + 1]; - for (Profile profileToRemove : previousProfiles) { - persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "if (ctx._source.systemProperties.scoreModifiers == null) { ctx._source.systemProperties.scoreModifiers=[:] } ; if (ctx._source.systemProperties.scoreModifiers.containsKey(scoringId)) { ctx._source.scores[scoringId] = ctx._source.systemProperties.scoreModifiers[scoringId] } else { ctx._source.scores.remove(scoringId) }", scriptParams); - } - if(scoring.getMetadata().isEnabled()) { - String script = "if (ctx._source.scores == null) { ctx._source.scores=[:] } ; if (ctx._source.scores.containsKey(scoringId)) { ctx._source.scores[scoringId] += scoringValue } else { ctx._source.scores[scoringId] = scoringValue }"; - Map<String, Event> updatedProfiles = new HashMap<>(); + scriptParams[0] = new HashMap<String, Object>(); + scriptParams[0].put("scoringId", scoring.getItemId()); + scripts[0] = "if( ctx._source.containsKey(\"systemProperties\") && ctx._source.systemProperties.containsKey(\"scoreModifiers\") && ctx._source.systemProperties.scoreModifiers.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.systemProperties.scoreModifiers.get(params.scoringId)) } else { ctx._source.scores.remove(params.scoringId) }"; + conditions[0] = scoringCondition; + + if (scoring.getMetadata().isEnabled()) { + String scriptToAdd = "if( !ctx._source.containsKey(\"scores\") ){ ctx._source.put(\"scores\", [:])} if( ctx._source.scores.containsKey(params.scoringId) ) { ctx._source.scores.put(params.scoringId, ctx._source.scores.get(params.scoringId)+params.scoringValue) } else { ctx._source.scores.put(params.scoringId, params.scoringValue) }"; + int idx = 1; for (ScoringElement element : scoring.getElements()) { - scriptParams.put("scoringValue", element.getValue()); - for (Profile p : persistenceService.query(element.getCondition(), null, Profile.class)) { - persistenceService.updateWithScript(p.getItemId(), null, Profile.class, script, scriptParams); - Event profileUpdated = new Event("profileUpdated", null, p, null, null, p, new Date()); - profileUpdated.setPersistent(false); - updatedProfiles.put(p.getItemId(), profileUpdated); - } - } - Iterator<Map.Entry<String, Event>> entries = updatedProfiles.entrySet().iterator(); - while (entries.hasNext()) { - eventService.send(entries.next().getValue()); + scriptParams[idx] = new HashMap<>(); + scriptParams[idx].put("scoringId", scoring.getItemId()); + scriptParams[idx].put("scoringValue", element.getValue()); + scripts[idx] = scriptToAdd; + conditions[idx] = element.getCondition(); + idx++; } } - logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); + + persistenceService.updateWithQueryAndScript(null, Profile.class, scripts, scriptParams, conditions); + logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t); } private void updateExistingProfilesForRemovedScoring(String scoringId) { @@ -955,7 +956,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList for (Profile profileToRemove : previousProfiles) { persistenceService.updateWithScript(profileToRemove.getItemId(), null, Profile.class, "ctx._source.scores.remove(scoringId)", scriptParams); } - logger.info("Profiles updated in {}ms", System.currentTimeMillis()-t); + logger.info("Profiles updated in {}ms", System.currentTimeMillis() - t); } private String getMD5(String md5) { @@ -976,7 +977,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList switch (event.getType()) { case BundleEvent.STARTED: processBundleStartup(event.getBundle().getBundleContext()); - break; + break; case BundleEvent.STOPPING: processBundleStop(event.getBundle().getBundleContext()); break; @@ -1034,7 +1035,7 @@ public class SegmentServiceImpl implements SegmentService, SynchronousBundleList } private <T extends MetadataItem> PartialList<Metadata> getMetadatas(Query query, Class<T> clazz) { - if(query.isForceRefresh()){ + if (query.isForceRefresh()) { persistenceService.refresh(); } definitionsService.resolveConditionType(query.getCondition());