This is an automated email from the ASF dual-hosted git repository. himanshug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 42cf078 Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716) 42cf078 is described below commit 42cf07884345fa5fba70a17156678dc6c37ab3d1 Author: BIGrey <huanghui0...@163.com> AuthorDate: Tue May 28 00:46:17 2019 +0800 Fix memory problem (OOM/FGC) when expression is used in metricsSpec (#7716) * AggregatorUtil should cache parsed expression to avoid memory problem (OOM/FGC) when Expression is used in metricsSpec * remove debug log check in Parser.parse * remove cache and use suppliers.memorize --- .../druid/benchmark/datagen/BenchmarkSchemas.java | 18 +++++ .../druid/query/aggregation/AggregatorUtil.java | 20 ++--- .../aggregation/SimpleDoubleAggregatorFactory.java | 10 ++- .../aggregation/SimpleFloatAggregatorFactory.java | 10 ++- .../aggregation/SimpleLongAggregatorFactory.java | 10 ++- .../incremental/OnheapIncrementalIndexTest.java | 94 ++++++++++++++++++++++ 6 files changed, 139 insertions(+), 23 deletions(-) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java index cda9f47..3e35a66 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java @@ -21,6 +21,7 @@ package org.apache.druid.benchmark.datagen; import com.google.common.collect.ImmutableList; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory; @@ -85,6 +86,14 @@ public class BenchmarkSchemas basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf")); basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique")); + List<AggregatorFactory> basicSchemaIngestAggsExpression = new ArrayList<>(); + basicSchemaIngestAggsExpression.add(new CountAggregatorFactory("rows")); + basicSchemaIngestAggsExpression.add(new LongSumAggregatorFactory("sumLongSequential", null, "if(sumLongSequential>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumLongSequential,0)", ExprMacroTable.nil())); + basicSchemaIngestAggsExpression.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform")); + basicSchemaIngestAggsExpression.add(new DoubleSumAggregatorFactory("sumFloatNormal", null, "if(sumFloatNormal>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumFloatNormal,0)", ExprMacroTable.nil())); + basicSchemaIngestAggsExpression.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf")); + basicSchemaIngestAggsExpression.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique")); + Interval basicSchemaDataInterval = Intervals.utc(0, 1000000); BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo( @@ -93,7 +102,16 @@ public class BenchmarkSchemas basicSchemaDataInterval, true ); + + BenchmarkSchemaInfo basicSchemaExpression = new BenchmarkSchemaInfo( + basicSchemaColumns, + basicSchemaIngestAggsExpression, + basicSchemaDataInterval, + true + ); + SCHEMA_MAP.put("basic", basicSchema); + SCHEMA_MAP.put("expression", basicSchemaExpression); } static { // simple single string column and count agg schema, no rollup diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index bf1edd2..e5e5f51 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -24,8 +24,6 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.Pair; import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprEval; -import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.math.expr.Parser; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseDoubleColumnValueSelector; import org.apache.druid.segment.BaseFloatColumnValueSelector; @@ -184,9 +182,8 @@ public class AggregatorUtil */ static BaseFloatColumnValueSelector makeColumnValueSelectorWithFloatDefault( final ColumnSelectorFactory metricFactory, - final ExprMacroTable macroTable, @Nullable final String fieldName, - @Nullable final String fieldExpression, + @Nullable final Expr fieldExpression, final float nullValue ) { @@ -196,8 +193,7 @@ public class AggregatorUtil if (fieldName != null) { return metricFactory.makeColumnValueSelector(fieldName); } else { - final Expr expr = Parser.parse(fieldExpression, macroTable); - final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr); + final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression); class ExpressionFloatColumnSelector implements FloatColumnSelector { @Override @@ -231,9 +227,8 @@ public class AggregatorUtil */ static BaseLongColumnValueSelector makeColumnValueSelectorWithLongDefault( final ColumnSelectorFactory metricFactory, - final ExprMacroTable macroTable, @Nullable final String fieldName, - @Nullable final String fieldExpression, + @Nullable final Expr fieldExpression, final long nullValue ) { @@ -243,8 +238,7 @@ public class AggregatorUtil if (fieldName != null) { return metricFactory.makeColumnValueSelector(fieldName); } else { - final Expr expr = Parser.parse(fieldExpression, macroTable); - final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr); + final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression); class ExpressionLongColumnSelector implements LongColumnSelector { @Override @@ -276,9 +270,8 @@ public class AggregatorUtil */ static BaseDoubleColumnValueSelector makeColumnValueSelectorWithDoubleDefault( final ColumnSelectorFactory metricFactory, - final ExprMacroTable macroTable, @Nullable final String fieldName, - @Nullable final String fieldExpression, + @Nullable final Expr fieldExpression, final double nullValue ) { @@ -288,8 +281,7 @@ public class AggregatorUtil if (fieldName != null) { return metricFactory.makeColumnValueSelector(fieldName); } else { - final Expr expr = Parser.parse(fieldExpression, macroTable); - final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, expr); + final ColumnValueSelector<ExprEval> baseSelector = ExpressionSelectors.makeExprEvalSelector(metricFactory, fieldExpression); class ExpressionDoubleColumnSelector implements DoubleColumnSelector { @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java index 9dc8a2d..bb3cfa7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java @@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseDoubleColumnValueSelector; @@ -43,6 +46,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa protected final String expression; protected final ExprMacroTable macroTable; protected final boolean storeDoubleAsFloat; + protected final Supplier<Expr> fieldExpression; public SimpleDoubleAggregatorFactory( ExprMacroTable macroTable, @@ -56,6 +60,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa this.fieldName = fieldName; this.expression = expression; this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat(); + this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable)); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkArgument( fieldName == null ^ expression == null, @@ -67,9 +72,8 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa { return AggregatorUtil.makeColumnValueSelectorWithDoubleDefault( metricFactory, - macroTable, fieldName, - expression, + fieldExpression.get(), nullValue ); } @@ -117,7 +121,7 @@ public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFa { return fieldName != null ? Collections.singletonList(fieldName) - : Parser.findRequiredBindings(Parser.parse(expression, macroTable)); + : Parser.findRequiredBindings(fieldExpression.get()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java index c535088..6b43113 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java @@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseFloatColumnValueSelector; @@ -41,6 +44,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac @Nullable protected final String expression; protected final ExprMacroTable macroTable; + protected final Supplier<Expr> fieldExpression; public SimpleFloatAggregatorFactory( ExprMacroTable macroTable, @@ -53,6 +57,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac this.name = name; this.fieldName = fieldName; this.expression = expression; + this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable)); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkArgument( fieldName == null ^ expression == null, @@ -64,9 +69,8 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac { return AggregatorUtil.makeColumnValueSelectorWithFloatDefault( metricFactory, - macroTable, fieldName, - expression, + fieldExpression.get(), nullValue ); } @@ -111,7 +115,7 @@ public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFac { return fieldName != null ? Collections.singletonList(fieldName) - : Parser.findRequiredBindings(Parser.parse(expression, macroTable)); + : Parser.findRequiredBindings(fieldExpression.get()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java index ec7e222..f53a57d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java @@ -22,6 +22,9 @@ package org.apache.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import org.apache.druid.math.expr.Expr; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.math.expr.Parser; import org.apache.druid.segment.BaseLongColumnValueSelector; @@ -41,6 +44,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact @Nullable protected final String expression; protected final ExprMacroTable macroTable; + protected final Supplier<Expr> fieldExpression; public SimpleLongAggregatorFactory( ExprMacroTable macroTable, @@ -53,6 +57,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact this.name = name; this.fieldName = fieldName; this.expression = expression; + this.fieldExpression = Suppliers.memoize(() -> expression == null ? null : Parser.parse(expression, macroTable)); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); Preconditions.checkArgument( fieldName == null ^ expression == null, @@ -64,9 +69,8 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact { return AggregatorUtil.makeColumnValueSelectorWithLongDefault( metricFactory, - macroTable, fieldName, - expression, + fieldExpression.get(), nullValue ); } @@ -107,7 +111,7 @@ public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFact { return fieldName != null ? Collections.singletonList(fieldName) - : Parser.findRequiredBindings(Parser.parse(expression, macroTable)); + : Parser.findRequiredBindings(fieldExpression.get()); } @Override diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java index 76ce9d7..c5977a0 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java @@ -19,12 +19,17 @@ package org.apache.druid.segment.incremental; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregator; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.expression.TestExprMacroTable; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -104,6 +109,95 @@ public class OnheapIncrementalIndexTest } @Test + public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception + { + final IncrementalIndex indexExpr = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(new LongSumAggregatorFactory( + "oddnum", + null, + "if(value%2==1,1,0)", + TestExprMacroTable.INSTANCE + )) + .withRollup(true) + .build() + ) + .setMaxRowCount(MAX_ROWS) + .buildOnheap(); + + final IncrementalIndex indexJs = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(new JavaScriptAggregatorFactory( + "oddnum", + ImmutableList.of("value"), + "function(current, value) { if (value%2==1) current = current + 1; return current;}", + "function() {return 0;}", + "function(a, b) { return a + b;}", + JavaScriptConfig.getEnabledInstance() + )) + .withRollup(true) + .build() + ) + .setMaxRowCount(MAX_ROWS) + .buildOnheap(); + + final int addThreadCount = 2; + Thread[] addThreads = new Thread[addThreadCount]; + for (int i = 0; i < addThreadCount; ++i) { + addThreads[i] = new Thread(new Runnable() + { + @Override + public void run() + { + final Random random = ThreadLocalRandom.current(); + try { + for (int j = 0; j < MAX_ROWS / addThreadCount; ++j) { + int randomInt = random.nextInt(100000); + MapBasedInputRow mapBasedInputRowExpr = new MapBasedInputRow( + 0, + Collections.singletonList("billy"), + ImmutableMap.of("billy", randomInt % 3, "value", randomInt) + ); + MapBasedInputRow mapBasedInputRowJs = new MapBasedInputRow( + 0, + Collections.singletonList("billy"), + ImmutableMap.of("billy", randomInt % 3, "value", randomInt) + ); + indexExpr.add(mapBasedInputRowExpr); + indexJs.add(mapBasedInputRowJs); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + addThreads[i].start(); + } + + for (int i = 0; i < addThreadCount; ++i) { + addThreads[i].join(); + } + + long exprSum = 0; + long jsSum = 0; + + for (IncrementalIndexRow row : indexExpr.getFacts().keySet()) { + exprSum += indexExpr.getMetricLongValue(row.getRowIndex(), 0); + } + + for (IncrementalIndexRow row : indexJs.getFacts().keySet()) { + jsSum += indexJs.getMetricLongValue(row.getRowIndex(), 0); + } + + Assert.assertEquals(exprSum, jsSum); + } + + @Test public void testOnHeapIncrementalIndexClose() throws Exception { // Prepare the mocks & set close() call count expectation to 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org