(druid) branch master updated (48b8d42698b -> 0cc54e08363)
This is an automated email from the ASF dual-hosted git repository. kfaraz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/druid.git from 48b8d42698b fix regexp_like, contains_string, icontains_string to return null instead of false for null inputs in sql compatible mode (#15963) add 0cc54e08363 Fix workflow labeler parameter to match the correct status img (#16142) No new revisions were added by this update. Summary of changes: README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Fix workflow labeler parameter to match the correct status img (druid)
kfaraz merged PR #16142: URL: https://github.com/apache/druid/pull/16142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Add manifest file for MSQ export (druid)
LakshSingla commented on code in PR #15953: URL: https://github.com/apache/druid/pull/15953#discussion_r1531505943 ## docs/multi-stage-query/reference.md: ## @@ -99,6 +99,17 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte This variation of EXTERN requires one argument, the details of the destination as specified below. This variation additionally requires an `AS` clause to specify the format of the exported rows. +While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. +- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. Review Comment: What is the symlink manifest format? I wasn't able to find a definitive answer while searching "symlink manifest format", therefore some clarification would be helpful. Also, is it for Druid's internal use, or can other systems and operators make use of the manifest file created? ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java: ## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { +this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List exportedFiles) throws IOException + { +final StorageConnector storageConnector = exportStorageProvider.get(); +log.info("Writing manifest file at [%s]", exportStorageProvider.getBasePath()); Review Comment: nit: The sentence should make sense when reading without the interpolation ```suggestion log.info("Writing manifest file at location[%s]", exportStorageProvider.getBasePath()); ``` ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ## @@ -107,7 +112,14 @@ public ProcessorsAndChannels makeProcessors( ); if (inputSliceReader.numReadableInputs(slice) == 0) { - return new ProcessorsAndChannels<>(ProcessorManagers.none(), OutputChannels.none()); + return new ProcessorsAndChannels<>( + ProcessorManagers.of(Sequences.empty()) + .withAccumulation(new ArrayList(), (acc, file) -> { + ((ArrayList) acc).add((String) file); + return acc; + }), Review Comment: Seems confusing, do we need this? ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java: ## @@ -88,7 +93,7 @@ public ExportStorageProvider getExportStorageProvider() } @Override - public ProcessorsAndChannels makeProcessors( + public ProcessorsAndChannels makeProcessors( Review Comment: Why not ```suggestion public ProcessorsAndChannels> makeProcessors( ``` ##
(druid) branch master updated (2b23d0b5b52 -> 48b8d42698b)
This is an automated email from the ASF dual-hosted git repository. cwylie pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/druid.git from 2b23d0b5b52 MSQ: Controller checker should check for "closed" only. (#16161) add 48b8d42698b fix regexp_like, contains_string, icontains_string to return null instead of false for null inputs in sql compatible mode (#15963) No new revisions were added by this update. Summary of changes: .../java/org/apache/druid/query/expression/ContainsExpr.java | 2 +- .../apache/druid/query/expression/RegexpLikeExprMacro.java | 2 +- .../druid/query/expression/CaseInsensitiveExprMacroTest.java | 12 ++-- .../apache/druid/query/expression/ContainsExprMacroTest.java | 9 + .../druid/query/expression/RegexpLikeExprMacroTest.java | 9 + .../apache/druid/sql/calcite/expression/ExpressionsTest.java | 6 +++--- 6 files changed, 21 insertions(+), 19 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] fix REGEXP_LIKE, CONTAINS_STRING, ICONTAINS_STRING for correct 3vl behavior (druid)
clintropolis merged PR #15963: URL: https://github.com/apache/druid/pull/15963 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531503876 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { +this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { +// need to validate query first +// populate the group of operators to be processed as each stage +// the size of the operators is the number of serialized stages +// later we should also check if these can be parallelized +// check there is an empty over clause or not +List> operatorList = new ArrayList<>(); +boolean status = validateAndReturnOperatorList(originalQuery, operatorList); Review Comment: Added javadocs and updated variable and function name to make more sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531504063 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); -queryDefBuilder.add( -StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) -); +// the result signature might change +// if window shufle spec is added +// say the output signature was d0, d1 +// But shuffle spec for window was d1 +// create the shufflespec from the column in the context +// and sort after wards to ensure prefix of shuffle is in row signature +final ShuffleSpec nextShuffleWindowSpec; +if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); +} else { + nextShuffleWindowSpec = null; Review Comment: Yes refactored in that way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531503758 ## processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.read.columnar.FrameColumnReaders; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + private final Frame frame; + private final RowSignature signature; + private final LinkedHashMap colCache = new LinkedHashMap<>(); + + public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) + { +this.frame = FrameType.COLUMNAR.ensureType(frame); +this.signature = signature; + } + + @Override + public Collection getColumnNames() + { +return signature.getColumnNames(); + } + + @Override + public int numRows() + { +return frame.numRows(); + } + + @Nullable + @Override + public Column findColumn(String name) + { +// Use contains so that we can negative cache. +if (!colCache.containsKey(name)) { + final int columnIndex = signature.indexOf(name); + if (columnIndex < 0) { +colCache.put(name, null); + } else { +final ColumnType columnType = signature +.getColumnType(columnIndex) +.orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + +colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); + } +} +return colCache.get(name); + + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { +if (StorageAdapter.class.equals(clazz)) { + return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); +} +if (WireTransferable.class.equals(clazz)) { Review Comment: WireTransferable is used in native to group results across historicals in leaf processor. We do not need it in MSQ as during planning we avoid leaf operators. This code is used by the native windowing as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] add new typed in filter (druid)
gianm commented on code in PR #16039: URL: https://github.com/apache/druid/pull/16039#discussion_r1531475670 ## processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java: ## @@ -258,6 +265,70 @@ public T computeBitmapResult(BitmapResultFactory bitmapResultFactory, boo } } + private final class DoubleValueSetIndexes implements ValueSetIndexes + { +final FixedIndexed dictionary = doubleDictionarySupplier.get(); + +@Nullable +@Override +public BitmapColumnIndex forSortedValues(@Nonnull List sortedValues, TypeSignature matchValueType) +{ + final boolean matchNull = sortedValues.get(0) == null; + final Supplier unknownsIndex = () -> { +if (!matchNull && dictionary.get(0) == null) { + return valueIndexes.get(0); +} +return null; + }; + + // values are doubles and ordered in double order + if (matchValueType.is(ValueType.DOUBLE)) { Review Comment: Is it possible to minimize the copied code between here and the `LONG` version, such as by using a shared helper somewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] add new typed in filter (druid)
gianm commented on code in PR #16039: URL: https://github.com/apache/druid/pull/16039#discussion_r1530892864 ## sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/ArrayOverlapOperatorConversion.java: ## @@ -127,7 +130,7 @@ public DimFilter toDruidFilter( // to create an empty array with no argument, we just return null. return null; } else if (arrayElements.length == 1) { -if (plannerContext.isUseBoundsAndSelectors()) { +if (plannerContext.isUseBoundsAndSelectors() || (!simpleExtractionExpr.isDirectColumnAccess() && virtualColumnRegistry == null)) { Review Comment: when is `virtualColumnRegistry` null? & if it can be null, then I think CodeQL makes a good point: could it be null in the `else` branch as well, where there is no `null` guard? ## processing/src/main/java/org/apache/druid/segment/index/semantic/ValueSetIndexes.java: ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.index.semantic; + +import com.google.common.base.Supplier; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import org.apache.druid.collections.bitmap.BitmapFactory; +import org.apache.druid.collections.bitmap.ImmutableBitmap; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.index.BitmapColumnIndex; +import org.apache.druid.segment.index.SimpleImmutableBitmapDelegatingIterableIndex; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public interface ValueSetIndexes +{ + double SORTED_MERGE_RATIO_THRESHOLD = 0.12D; + int SIZE_WORTH_CHECKING_MIN = 8; + + /** + * Get the wrapped {@link ImmutableBitmap} corresponding to the specified set of values (if they are contained in the + * underlying column). The set must be sorted using the comparator of the supplied matchValueType. + * + * @param sortedValues values to match, sorted in matchValueType order + * @param matchValueType type of the value to match, used to assist conversion from the match value type to the column + * value type + * @return {@link ImmutableBitmap} corresponding to the rows which match the values, or null if an index + * connot be computed for the supplied value type + */ + @Nullable + BitmapColumnIndex forSortedValues(@Nonnull List sortedValues, TypeSignature matchValueType); + + static BitmapColumnIndex getIndexFromSortedIteratorSortedMerged( Review Comment: Javadoc for this and `getIndexFromSortedIterator`? For some reason, the names don't make it obvious to me what they do. (One is regular and one is `SortedMerged`?) Reading the code, it looks like this one does a linear scan through the dictionary, and the other one does a series of binary searches. Maybe call them `getIndexFromSortedIteratorWithScan` and `getIndexFromSortedIteratorWithBinarySearch`. Oh, now I see why it's called `SortedMerged`. The zipping is kind of like the merge step of a merge-sort. Still, I feel the "scan" and "binary search" names would make more sense. And Javadoc would definitely help. ## processing/src/main/java/org/apache/druid/segment/index/semantic/ValueSetIndexes.java: ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + *
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531503326 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java: ## @@ -140,6 +141,20 @@ public QueryDefinition makeQueryDefinition( // Add partition boosting column. clusterByColumns.add(new KeyColumn(QueryKitUtils.PARTITION_BOOST_COLUMN, KeyOrder.ASCENDING)); signatureBuilder.add(QueryKitUtils.PARTITION_BOOST_COLUMN, ColumnType.LONG); + final RowSignature signatureSoFar = signatureBuilder.build(); + boolean addShuffle = true; + if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { +final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); +for (KeyColumn c : windowClusterBy.getColumns()) { + if (!signatureSoFar.contains(c.columnName())) { +addShuffle = false; +break; + } +} +if (addShuffle) { + clusterByColumns.addAll(windowClusterBy.getColumns()); Review Comment: Updated the code, need to discuss more on cases where there will be a window over a boosted scan column -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531502915 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java: ## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.OutputChannel; +import org.apache.druid.frame.processor.OutputChannelFactory; +import org.apache.druid.frame.processor.OutputChannels; +import org.apache.druid.frame.processor.manager.ProcessorManagers; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.input.InputSlice; +import org.apache.druid.msq.input.InputSliceReader; +import org.apache.druid.msq.input.ReadableInput; +import org.apache.druid.msq.input.stage.ReadablePartition; +import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.ProcessorsAndChannels; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +@JsonTypeName("window") +public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessorFactory +{ + private final WindowOperatorQuery query; + private final List operatorList; + private final RowSignature stageRowSignature; + private final boolean isEmptyOver; + + @JsonCreator + public WindowOperatorQueryFrameProcessorFactory( + @JsonProperty("query") WindowOperatorQuery query, + @JsonProperty("operatorList") List operatorFactoryList, + @JsonProperty("stageRowSignature") RowSignature stageRowSignature, + @JsonProperty("emptyOver") boolean emptyOver + ) + { +this.query = Preconditions.checkNotNull(query, "query"); +this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator"); +this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); +this.isEmptyOver = emptyOver; + } + + @JsonProperty("query") + public WindowOperatorQuery getQuery() + { +return query; + } + + @JsonProperty("operatorList") + public List getOperators() + { +return operatorList; + } + + @JsonProperty("stageRowSignature") + public RowSignature getSignature() + { +return stageRowSignature; + } + + @JsonProperty("emptyOver") + public boolean isEmptyOverFound() + { +return isEmptyOver; + } + + @Override + public ProcessorsAndChannels makeProcessors( + StageDefinition stageDefinition, + int workerNumber, + List inputSlices, + InputSliceReader inputSliceReader, + @Nullable Object extra, + OutputChannelFactory outputChannelFactory, + FrameContext frameContext, + int maxOutstandingProcessors, + CounterTracker counters, + Consumer warningPublisher + ) + { +// Expecting a single input slice from some prior stage. +final StageInputSlice slice = (StageInputSlice) Iterables.getOnlyElement(inputSlices); +final Int2ObjectSortedMap outputChannels = new Int2ObjectAVLTreeMap<>(); + +for (final ReadablePartition partition : slice.getPartitions()) { + outputChannels.computeIfAbsent( + partition.getPartitionNumber(), + i -> { +try { + return
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531502826 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); -queryDefBuilder.add( -StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) -); +// the result signature might change +// if window shufle spec is added +// say the output signature was d0, d1 +// But shuffle spec for window was d1 +// create the shufflespec from the column in the context +// and sort after wards to ensure prefix of shuffle is in row signature +final ShuffleSpec nextShuffleWindowSpec; +if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); +} else { + nextShuffleWindowSpec = null; +} +final ShuffleSpec stageShuffleSpec; +if (shuffleSpecFactoryPostAggregation != null) { + List columns = resultClusterBy.getColumns(); + if (nextShuffleWindowSpec != null) { +columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns()); Review Comment: Updated it, atm when there a window I am not using boosting, would like to discuss a bit to understand which can be the cases where the boosted column will be present. Ideally I do not want the boosted column in my shuffle spec as that would be incorrect for the windowing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531501731 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { +this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { +// need to validate query first +// populate the group of operators to be processed as each stage +// the size of the operators is the number of serialized stages +// later we should also check if these can be parallelized +// check there is an empty over clause or not +List> operatorList = new ArrayList<>(); +boolean status = validateAndReturnOperatorList(originalQuery, operatorList); + + +ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); +// add this shuffle spec to the last stage of the inner query + +final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId); +if (nextShuffleSpec != null) { + final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy(); + originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of( + MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, + windowClusterBy + )); +} +final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( +queryKit, +queryId, +originalQuery.context(), +originalQuery.getDataSource(), +originalQuery.getQuerySegmentSpec(), +originalQuery.getFilter(), +null, +maxWorkerCount, +minStageNumber, +false +); + +dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll); + +final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); +final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); +RowSignature rowSignature = queryToRun.getRowSignature(); + + +if (status) { + // empty over clause found + // moving everything to a single partition + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber) + .inputs(new StageInputSpec(firstStageNumber - 1)) Review Comment: Because the window uses the previous stage as its input ##
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531502304 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); -queryDefBuilder.add( -StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null - ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) - : null - ) - .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) -); +// the result signature might change +// if window shufle spec is added +// say the output signature was d0, d1 +// But shuffle spec for window was d1 +// create the shufflespec from the column in the context +// and sort after wards to ensure prefix of shuffle is in row signature +final ShuffleSpec nextShuffleWindowSpec; +if (originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) { + final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL); + nextShuffleWindowSpec = new HashShuffleSpec( + windowClusterBy, + maxWorkerCount + ); +} else { + nextShuffleWindowSpec = null; +} +final ShuffleSpec stageShuffleSpec; +if (shuffleSpecFactoryPostAggregation != null) { + List columns = resultClusterBy.getColumns(); + if (nextShuffleWindowSpec != null) { +columns.addAll(nextShuffleWindowSpec.clusterBy().getColumns()); +// Creating a new cluster by with the columns from existing +// plus the columns from the next window partition column +final ClusterBy tmp = new ClusterBy(columns, resultClusterBy.getBucketByCount()); +stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(tmp, false); + } else { +stageShuffleSpec = shuffleSpecFactoryPostAggregation.build(resultClusterBy, false); + } +} else { + stageShuffleSpec = nextShuffleWindowSpec; +} +final RowSignature stageSignature; +if (stageShuffleSpec == null) { + stageSignature = resultSignature; +} else { + // sort the signature to make sure the prefix is aligned + stageSignature = QueryKitUtils.sortableSignature( + resultSignature, + stageShuffleSpec.clusterBy().getColumns() + ); +} if (doLimitOrOffset) { + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber + 1) + .inputs(new StageInputSpec(firstStageNumber)) + .signature(resultSignature) + .maxWorkerCount(maxWorkerCount) + .shuffleSpec( + shuffleSpecFactoryPostAggregation != null + ? shuffleSpecFactoryPostAggregation.build(resultClusterBy, false) + : null + ) + .processorFactory(new GroupByPostShuffleFrameProcessorFactory(queryToRun)) + ); final DefaultLimitSpec limitSpec = (DefaultLimitSpec) queryToRun.getLimitSpec(); queryDefBuilder.add( StageDefinition.builder(firstStageNumber + 2) .inputs(new StageInputSpec(firstStageNumber + 1)) .signature(resultSignature) .maxWorkerCount(1) - .shuffleSpec(null) // no shuffling should be required after a limit processor. + // no shuffling should be required after a limit processor. + // but need one if the next stage is a window with a partition by + .shuffleSpec(nextShuffleWindowSpec) Review Comment: Refactored query kit to make sure the old code runs when the next window shuffle soec in null else go through the new code ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java: ## @@ -164,34 +168,88 @@ public QueryDefinition makeQueryDefinition( partitionBoost ); -queryDefBuilder.add( -StageDefinition.builder(firstStageNumber + 1) - .inputs(new StageInputSpec(firstStageNumber)) - .signature(resultSignature) - .maxWorkerCount(maxWorkerCount) - .shuffleSpec( - shuffleSpecFactoryPostAggregation != null -
Re: [PR] Window function on msq (druid)
somu-imply commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1531501603 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.QueryDefinition; +import org.apache.druid.msq.kernel.QueryDefinitionBuilder; +import org.apache.druid.msq.kernel.ShuffleSpec; +import org.apache.druid.msq.kernel.StageDefinition; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.Query; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; +import org.apache.druid.query.operator.NaiveSortOperatorFactory; +import org.apache.druid.query.operator.OperatorFactory; +import org.apache.druid.query.operator.WindowOperatorQuery; +import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.RowSignature; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class WindowOperatorQueryKit implements QueryKit +{ + private final ObjectMapper jsonMapper; + + public WindowOperatorQueryKit(ObjectMapper jsonMapper) + { +this.jsonMapper = jsonMapper; + } + + @Override + public QueryDefinition makeQueryDefinition( + String queryId, + WindowOperatorQuery originalQuery, + QueryKit> queryKit, + ShuffleSpecFactory resultShuffleSpecFactory, + int maxWorkerCount, + int minStageNumber + ) + { +// need to validate query first +// populate the group of operators to be processed as each stage +// the size of the operators is the number of serialized stages +// later we should also check if these can be parallelized +// check there is an empty over clause or not +List> operatorList = new ArrayList<>(); +boolean status = validateAndReturnOperatorList(originalQuery, operatorList); + + +ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); +// add this shuffle spec to the last stage of the inner query + +final QueryDefinitionBuilder queryDefBuilder = QueryDefinition.builder().queryId(queryId); +if (nextShuffleSpec != null) { + final ClusterBy windowClusterBy = nextShuffleSpec.clusterBy(); + originalQuery = (WindowOperatorQuery) originalQuery.withOverriddenContext(ImmutableMap.of( + MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, + windowClusterBy + )); +} +final DataSourcePlan dataSourcePlan = DataSourcePlan.forDataSource( +queryKit, +queryId, +originalQuery.context(), +originalQuery.getDataSource(), +originalQuery.getQuerySegmentSpec(), +originalQuery.getFilter(), +null, +maxWorkerCount, +minStageNumber, +false +); + +dataSourcePlan.getSubQueryDefBuilder().ifPresent(queryDefBuilder::addAll); + +final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); +final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); +RowSignature rowSignature = queryToRun.getRowSignature(); + + +if (status) { + // empty over clause found + // moving everything to a single partition + queryDefBuilder.add( + StageDefinition.builder(firstStageNumber) + .inputs(new StageInputSpec(firstStageNumber - 1)) + .signature(rowSignature) + .maxWorkerCount(maxWorkerCount) +
(druid) branch master updated: MSQ: Controller checker should check for "closed" only. (#16161)
This is an automated email from the ASF dual-hosted git repository. gian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new 2b23d0b5b52 MSQ: Controller checker should check for "closed" only. (#16161) 2b23d0b5b52 is described below commit 2b23d0b5b52f1184e1642c5b5b15af4f4aa216ad Author: Gian Merlino AuthorDate: Tue Mar 19 19:25:48 2024 -0700 MSQ: Controller checker should check for "closed" only. (#16161) * MSQ: Controller checker should check for "closed" only. Currently, the worker's controller checker will exit the worker if the controller location is "closed" (no longer running) or if its location is empty (i.e. location unknown). This patch changes to only exit on "closed". We shouldn't exit on empty location, because that may happen if the Overlord is slow to acknowledge the location of a task. * Fix test. --- .../apache/druid/msq/indexing/IndexerWorkerContext.java | 4 +++- .../druid/msq/indexing/IndexerWorkerContextTest.java| 17 ++--- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index 53cd6e942ea..1bd789df769 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -190,7 +190,9 @@ public class IndexerWorkerContext implements WorkerContext break; } - if (controllerLocations.isClosed() || controllerLocations.getLocations().isEmpty()) { + // Note: don't exit on empty location, because that may happen if the Overlord is slow to acknowledge the + // location of a task. Only exit on "closed", because that happens only if the task is really no longer running. + if (controllerLocations.isClosed()) { log.warn( "Periodic fetch of controller location returned [%s]. Worker task [%s] will exit.", controllerLocations, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 2ae8d155d4d..583c21d3407 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -54,20 +54,6 @@ public class IndexerWorkerContextTest ); } - @Test - public void testControllerCheckerRunnableExitsWhenEmptyStatus() - { -final ServiceLocator controllerLocatorMock = Mockito.mock(ServiceLocator.class); -Mockito.when(controllerLocatorMock.locate()) - .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet(; - -final Worker workerMock = Mockito.mock(Worker.class); - -indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); -Mockito.verify(controllerLocatorMock, Mockito.times(1)).locate(); -Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); - } - @Test public void testControllerCheckerRunnableExitsOnlyWhenClosedStatus() { @@ -76,12 +62,13 @@ public class IndexerWorkerContextTest .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/" // Done to check the behavior of the runnable, the situation of exiting after success might not occur actually .thenReturn(Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation("h", 1, -1, "/" + .thenReturn(Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet( .thenReturn(Futures.immediateFuture(ServiceLocations.closed())); final Worker workerMock = Mockito.mock(Worker.class); indexerWorkerContext.controllerCheckerRunnable(controllerLocatorMock, workerMock); -Mockito.verify(controllerLocatorMock, Mockito.times(3)).locate(); +Mockito.verify(controllerLocatorMock, Mockito.times(4)).locate(); Mockito.verify(workerMock, Mockito.times(1)).controllerFailed(); } } - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MSQ: Controller checker should check for "closed" only. (druid)
gianm merged PR #16161: URL: https://github.com/apache/druid/pull/16161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Remove index_realtime and index_realtime_appenderator tasks. (druid)
github-actions[bot] commented on PR #15717: URL: https://github.com/apache/druid/pull/15717#issuecomment-2008385789 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the d...@druid.apache.org list. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve retry log messages in integration tests (druid)
github-actions[bot] commented on PR #14664: URL: https://github.com/apache/druid/pull/14664#issuecomment-2008385169 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the d...@druid.apache.org list. Thank you for your contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Parameterize segment IDs (druid)
abhishekrb19 opened a new pull request, #16174: URL: https://github.com/apache/druid/pull/16174 Issue: The `markUsed` API accepts either an `interval` or `segmentIds`. When using segment IDs, the underlying query uses an `IN` clause with the user-provided values inlined directly in the query instead of being parameterized. Fix: This patch fixes that by adding the following utilities specifically for queries that contain an `IN` clause: - `getParameterizedInConditionForColumn()` - `bindColumnValuesToQueryWithInCondition()` Other related changes: Renamed the following functions for consistency and code hygiene: - `appendConditionForIntervalsAndMatchMode()` to `getConditionForIntervalsAndMatchMode()`. Pass only select arguments to the function. - `bindQueryIntervals()` to `bindIntervalsToQuery()` This PR has: - [x] been self-reviewed. - [ ] a release note entry in the PR description. - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [x] been tested in a test Druid cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: Expose Kinesis lag metrics (#16172)
This is an automated email from the ASF dual-hosted git repository. georgew5656 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new e7cf8299cea Expose Kinesis lag metrics (#16172) e7cf8299cea is described below commit e7cf8299cea3915008323c38518aa472f36c63e5 Author: YongGang AuthorDate: Tue Mar 19 16:42:10 2024 -0700 Expose Kinesis lag metrics (#16172) --- .../statsd-emitter/src/main/resources/defaultMetricDimensions.json | 5 + 1 file changed, 5 insertions(+) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 11fe43909d0..c3ad6e11efd 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -60,6 +60,11 @@ "ingest/kafka/avgLag" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, "ingest/kafka/partitionLag" : { "dimensions" : ["dataSource", "stream", "partition"], "type" : "gauge" }, + "ingest/kinesis/lag/time" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, + "ingest/kinesis/maxLag/time" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, + "ingest/kinesis/avgLag/time" : { "dimensions" : ["dataSource", "stream"], "type" : "gauge" }, + "ingest/kinesis/partitionLag/time" : { "dimensions" : ["dataSource", "stream", "partition"], "type" : "gauge" }, + "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "task/running/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Expose Kinesis lag metrics (druid)
georgew5656 merged PR #16172: URL: https://github.com/apache/druid/pull/16172 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
YongGang commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531222863 ## indexing-service/src/main/java/org/apache/druid/indexing/common/task/DefaultTaskIdentitiesProvider.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.base.Strings; +import org.apache.druid.query.DruidMetrics; + +import java.util.HashMap; +import java.util.Map; + +public class DefaultTaskIdentitiesProvider implements TaskIdentitiesProvider +{ + public static final String TYPE = "default"; + + @Override + public Map getTaskMetricTags(Task task) + { +String taskIdentifier = task.getType(); Review Comment: That's a valuable suggestion, however, implementing it could require modifications across numerous task types. I'd like to keep the scope of this PR focused, and consider that as potential follow-up work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] INSERT/REPLACE dimension target column types are validated against source input expressions (druid)
zachjsh commented on code in PR #15962: URL: https://github.com/apache/druid/pull/15962#discussion_r1531222530 ## sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java: ## @@ -432,11 +456,18 @@ private RelDataType validateTargetType( fields.add(Pair.of(colName, sourceField.getType())); continue; } - RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName)); - fields.add(Pair.of( - colName, - typeFactory.createTypeWithNullability(relType, true) - )); + if (NullHandling.replaceWithDefault()) { Review Comment: Thanks! Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
YongGang commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531217281 ## indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskIdentitiesProvider.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * The TaskIdentitiesProvider interface helps add metric tags to tasks. + * It's meant to make task management, monitoring, and reporting better by providing extra information + * about tasks. Both user-defined and system-generated tags are included, making details about task + * performance and characteristics clearer in reports and summaries. + */ +@ExtensionPoint +public interface TaskIdentitiesProvider Review Comment: I believe `Information` might be too broad and doesn't capture the specific intent of this ExtensionPoint, which is focused on delivering task identity details for monitoring and management purposes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
YongGang commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531208204 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java: ## @@ -240,13 +241,14 @@ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception injector.getInstance(Key.get(ServiceClientFactory.class, EscalatedGlobal.class)); final OverlordClient overlordClient = injector.getInstance(OverlordClient.class) .withRetryPolicy(StandardRetryPolicy.unlimited()); +final TaskIdentitiesProvider taskIdentitiesProvider = injector.getInstance(TaskIdentitiesProvider.class); Review Comment: There should be only one type of provider configured and in use, otherwise the usage will become too complicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Web console: fix Azure icon not rendering (druid)
vogievetsky opened a new pull request, #16173: URL: https://github.com/apache/druid/pull/16173 No need to lowercase the icon. https://github.com/apache/druid/assets/177816/5e0c7ca8-f10e-4aa9-b99f-0da7ed73f6e8;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
arunramani commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531162898 ## indexing-service/src/main/java/org/apache/druid/indexing/common/task/DefaultTaskIdentitiesProvider.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import com.google.common.base.Strings; +import org.apache.druid.query.DruidMetrics; + +import java.util.HashMap; +import java.util.Map; + +public class DefaultTaskIdentitiesProvider implements TaskIdentitiesProvider +{ + public static final String TYPE = "default"; + + @Override + public Map getTaskMetricTags(Task task) + { +String taskIdentifier = task.getType(); Review Comment: Should we consider adding a `groupType` to the task object? Parsing the group IDs name is pretty fragile. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
arunramani commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531160408 ## indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskIdentitiesProvider.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +import org.apache.druid.guice.annotations.ExtensionPoint; + +import java.util.Map; + +/** + * The TaskIdentitiesProvider interface helps add metric tags to tasks. + * It's meant to make task management, monitoring, and reporting better by providing extra information + * about tasks. Both user-defined and system-generated tags are included, making details about task + * performance and characteristics clearer in reports and summaries. + */ +@ExtensionPoint +public interface TaskIdentitiesProvider Review Comment: I think `TaskInformationProvider` a simpler and cleaner name. Did we consider that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Improve Task Monitoring and Analysis (druid)
arunramani commented on code in PR #16041: URL: https://github.com/apache/druid/pull/16041#discussion_r1531158134 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java: ## @@ -240,13 +241,14 @@ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception injector.getInstance(Key.get(ServiceClientFactory.class, EscalatedGlobal.class)); final OverlordClient overlordClient = injector.getInstance(OverlordClient.class) .withRetryPolicy(StandardRetryPolicy.unlimited()); +final TaskIdentitiesProvider taskIdentitiesProvider = injector.getInstance(TaskIdentitiesProvider.class); Review Comment: We should use the provider to get ALL instances of `TaskIdentitiesProvider` and then provide that as a list to the controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Expose Kinesis lag metrics (druid)
YongGang opened a new pull request, #16172: URL: https://github.com/apache/druid/pull/16172 ### Description Expose Kinesis lag metrics so can setup alerts on them. Release note # Key changed/added classes in this PR * `defaultMetricDimensions.json` This PR has: - [ ] been self-reviewed. - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.) - [ ] added documentation for new or modified features or behaviors. - [ ] a release note entry in the PR description. - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md) - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [ ] added integration tests. - [ ] been tested in a test Druid cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] INSERT/REPLACE dimension target column types are validated against source input expressions (druid)
clintropolis commented on code in PR #15962: URL: https://github.com/apache/druid/pull/15962#discussion_r1530966917 ## sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidSqlValidator.java: ## @@ -432,11 +456,18 @@ private RelDataType validateTargetType( fields.add(Pair.of(colName, sourceField.getType())); continue; } - RelDataType relType = typeFactory.createSqlType(SqlTypeName.get(sqlTypeName)); - fields.add(Pair.of( - colName, - typeFactory.createTypeWithNullability(relType, true) - )); + if (NullHandling.replaceWithDefault()) { Review Comment: for string types, default value mode treats `null` and `''` interchangeably, so it probably should be marked as nullable? tbh its not super well defined, but we do always mark string values as nullable in default value mode since the empty strings behave as null. For numbers though default value mode effectively means that they don't exist. This default value mode is deprecated now, so we probably don't have to spend too much time thinking about how things should behave, so we should probably match the computed schema stuff (strings always nullable in either mode, numbers only nullable in sql compatible mode) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MetricsModule: inject DataSourceTaskIdHolder early (druid)
arunramani commented on PR #16140: URL: https://github.com/apache/druid/pull/16140#issuecomment-2007968070 > @arunramani Is it possible to add a test case of any sort for this ? I don't think there's any easy way to unit test this specifically. However the existing unit tests cover the injection working correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: SortMerge join support for IS NOT DISTINCT FROM. (#16003)
This is an automated email from the ASF dual-hosted git repository. gian pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new c96b215dd67 SortMerge join support for IS NOT DISTINCT FROM. (#16003) c96b215dd67 is described below commit c96b215dd67fad883b1f6651ed568426aae7070f Author: Gian Merlino AuthorDate: Tue Mar 19 12:02:13 2024 -0700 SortMerge join support for IS NOT DISTINCT FROM. (#16003) * SortMerge join support for IS NOT DISTINCT FROM. The patch adds a "requiredNonNullKeyParts" field to the sortMerge processor, which has the list of key parts that must be nonnull for an equijoin condition to match. Conditions with SQL "=" are present in the list; conditions with SQL "IS NOT DISTINCT FROM" are absent from the list. * Fix test. * Update javadoc. --- .../apache/druid/msq/querykit/DataSourcePlan.java | 8 +- .../common/SortMergeJoinFrameProcessor.java| 22 +- .../common/SortMergeJoinFrameProcessorFactory.java | 25 +++ .../SortMergeJoinFrameProcessorFactoryTest.java| 241 .../common/SortMergeJoinFrameProcessorTest.java| 242 + .../druid/frame/key/FrameComparisonWidget.java | 15 +- .../druid/frame/key/FrameComparisonWidgetImpl.java | 25 ++- .../frame/key/FrameComparisonWidgetImplTest.java | 17 +- .../druid/sql/calcite/planner/JoinAlgorithm.java | 17 -- .../druid/sql/calcite/CalciteJoinQueryTest.java| 22 +- 10 files changed, 576 insertions(+), 58 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 566b084ad36..56fae646a4a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -333,19 +333,15 @@ public class DataSourcePlan /** * Checks if the sortMerge algorithm can execute a particular join condition. * - * Two checks: - * (1) join condition on two tables "table1" and "table2" is of the form + * One check: join condition on two tables "table1" and "table2" is of the form * table1.columnA = table2.columnA && table1.columnB = table2.columnB && - * - * (2) join condition uses equals, not IS NOT DISTINCT FROM [sortMerge processor does not currently implement - * IS NOT DISTINCT FROM] */ private static boolean canUseSortMergeJoin(JoinConditionAnalysis joinConditionAnalysis) { return joinConditionAnalysis .getEquiConditions() .stream() -.allMatch(equality -> equality.getLeftExpr().isIdentifier() && !equality.isIncludeNull()); +.allMatch(equality -> equality.getLeftExpr().isIdentifier()); } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java index 4b3854883a2..0fd85c6d082 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java @@ -138,6 +138,7 @@ public class SortMergeJoinFrameProcessor implements FrameProcessor FrameWriterFactory frameWriterFactory, String rightPrefix, List> keyColumns, + int[] requiredNonNullKeyParts, JoinType joinType, long maxBufferedBytes ) @@ -148,8 +149,8 @@ public class SortMergeJoinFrameProcessor implements FrameProcessor this.rightPrefix = rightPrefix; this.joinType = joinType; this.trackers = ImmutableList.of( -new Tracker(left, keyColumns.get(LEFT), maxBufferedBytes), -new Tracker(right, keyColumns.get(RIGHT), maxBufferedBytes) +new Tracker(left, keyColumns.get(LEFT), requiredNonNullKeyParts, maxBufferedBytes), +new Tracker(right, keyColumns.get(RIGHT), requiredNonNullKeyParts, maxBufferedBytes) ); this.maxBufferedBytes = maxBufferedBytes; } @@ -195,7 +196,7 @@ public class SortMergeJoinFrameProcessor implements FrameProcessor // Two rows match if the keys compare equal _and_ neither key has a null component. (x JOIN y ON x.a = y.a does // not match rows where "x.a" is null.) - final boolean marksMatch = markCmp == 0 && trackers.get(LEFT).hasCompletelyNonNullMark(); + final boolean marksMatch = markCmp == 0 && trackers.get(LEFT).markHasRequiredNonNullKeyParts(); // If marked keys are equal on both sides ("marksMatch"), at least one side needs to
Re: [PR] SortMerge join support for IS NOT DISTINCT FROM. (druid)
gianm merged PR #16003: URL: https://github.com/apache/druid/pull/16003 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) annotated tag druid-29.0.1-rc1 updated (15b3efd085d -> bf6755cd9f6)
This is an automated email from the ASF dual-hosted git repository. karan pushed a change to annotated tag druid-29.0.1-rc1 in repository https://gitbox.apache.org/repos/asf/druid.git *** WARNING: tag druid-29.0.1-rc1 was modified! *** from 15b3efd085d (commit) to bf6755cd9f6 (tag) tagging 15b3efd085d3e69417a00d06b04e325fd26e729a (commit) replaces druid-0.8.0-rc1 by cryptoe on Tue Mar 19 21:54:32 2024 +0530 - Log - [maven-release-plugin] copy for tag druid-29.0.1-rc1 --- No new revisions were added by this update. Summary of changes: - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
github-advanced-security[bot] commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1530758427 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1080,6 +1112,147 @@ ); } + private void doPollSegmentAndSchema() + { +log.info("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSchemaId == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload FROM %1$s where id > '%2$s'", + getSegmentSchemaTable(), + latestSchemaId); +} +String finalSchemaPollQuery = schemaPollQuery; + +final AtomicReference maxPolledId = new AtomicReference<>(); +maxPolledId.set(latestSchemaId); + +connector.inReadOnlyTransaction(new TransactionCallback() +{ + @Override + public Object inTransaction(Handle handle, TransactionStatus status) + { +return handle.createQuery(finalSchemaPollQuery) + .map(new ResultSetMapper() + { + @Override + public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException + { + try { + Long id = r.getLong("id"); Review Comment: ## Boxed variable is never null The variable 'id' is only assigned values of primitive type and is never 'null', but it is declared with the boxed type 'Long'. [Show more details](https://github.com/apache/druid/security/code-scanning/7155) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Populate segment stats for non-parallel compaction jobs (druid)
adithyachakilam opened a new pull request, #16171: URL: https://github.com/apache/druid/pull/16171 ### Description This PR populates the count of segments read/published for sequential compaction task. Release note # Key changed/added classes in this PR * `IndexTask` This PR has: - [ ] been self-reviewed. - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.) - [ ] added documentation for new or modified features or behaviors. - [ ] a release note entry in the PR description. - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md) - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [ ] added integration tests. - [ ] been tested in a test Druid cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MSQ controller: Support in-memory shuffles; towards JVM reuse. (druid)
github-advanced-security[bot] commented on code in PR #16168: URL: https://github.com/apache/druid/pull/16168#discussion_r1530710796 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java: ## @@ -2995,15 +2873,14 @@ */ private interface TaskContactFn { -ListenableFuture contactTask(WorkerClient client, String taskId, int workerNumber); +ListenableFuture contactTask(WorkerClient client, String workerId, int workerNumber); } /** * Interface used when {@link TaskContactFn#contactTask(WorkerClient, String, int)} returns a successful future. */ private interface TaskContactSuccess { -void onSuccess(String taskId); - +void onSuccess(String workerId, int workerNumber); Review Comment: ## Useless parameter The parameter 'workerId' is never used. [Show more details](https://github.com/apache/druid/security/code-scanning/7150) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: Add versions to `markUsed` and `markUnused` APIs (#16141)
This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new fa8e5114921 Add versions to `markUsed` and `markUnused` APIs (#16141) fa8e5114921 is described below commit fa8e51149216a367e9b303961c784a99e69de138 Author: Abhishek Radhakrishnan AuthorDate: Tue Mar 19 09:22:25 2024 -0700 Add versions to `markUsed` and `markUnused` APIs (#16141) * Mark used and unused APIs by versions. * remove the conditional invocations. * isValid() and test updates. * isValid() and tests. * Remove warning logs for invalid user requests. Also, downgrade visibility. * Update resp message, etc. * tests and some cleanup. * Docs draft * Clarify docs * Update server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java Co-authored-by: Kashif Faraz * Review comments * Remove default interface methods only used in tests and update docs. * Clarify javadocs and @Nullable. * Add more tests. * Parameterized versions. - Co-authored-by: Kashif Faraz --- docs/api-reference/data-management-api.md | 28 +- .../common/actions/MarkSegmentsAsUnusedAction.java | 5 +- .../common/task/KillUnusedSegmentsTaskTest.java| 21 +- .../druid/metadata/SegmentsMetadataManager.java| 16 +- .../druid/metadata/SqlSegmentsMetadataManager.java | 35 ++- .../druid/metadata/SqlSegmentsMetadataQuery.java | 146 --- .../druid/server/http/DataSourcesResource.java | 60 +++-- .../metadata/SqlSegmentsMetadataManagerTest.java | 257 +- .../apache/druid/metadata/TestDerbyConnector.java | 2 +- .../simulate/TestSegmentsMetadataManager.java | 4 +- .../druid/server/http/DataSourcesResourceTest.java | 286 + 11 files changed, 708 insertions(+), 152 deletions(-) diff --git a/docs/api-reference/data-management-api.md b/docs/api-reference/data-management-api.md index 4adeaa8b208..754bf62f725 100644 --- a/docs/api-reference/data-management-api.md +++ b/docs/api-reference/data-management-api.md @@ -206,7 +206,8 @@ Marks the state of a group of segments as unused, using an array of segment IDs Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. URL @@ -214,12 +215,13 @@ Druid only updates the segments completely contained within the specified interv Request body -The group of segments is sent as a JSON request payload that accepts one of the following properties: +The group of segments is sent as a JSON request payload that accepts the following properties: -|Property|Description|Example| -|--|-|-| -|`interval`|ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| -|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`| +|Property|Description|Required|Example| +|--|-|-|-| +|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| +|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`| +|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`| Responses @@ -306,7 +308,8 @@ Marks the state of a group of segments as used, using an array of segment IDs or Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. URL @@ -314,12 +317,13 @@ Druid only updates the segments completely contained within the specified interv Request body -The group of segments is sent
Re: [PR] Add versions to `markUsed` and `markUnused` APIs (druid)
abhishekrb19 merged PR #16141: URL: https://github.com/apache/druid/pull/16141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Add versions to `markUsed` and `markUnused` APIs (druid)
abhishekrb19 commented on PR #16141: URL: https://github.com/apache/druid/pull/16141#issuecomment-2007616191 Thanks for the reviews, @kfaraz and @zachjsh! @kfaraz, if you have more comments, I'm happy to address them in a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated (e3b75ac11f5 -> 1ad489a2aef)
This is an automated email from the ASF dual-hosted git repository. gian pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/druid.git from e3b75ac11f5 Fix FloatFirstVectorAggregationTest (#16165) add 1ad489a2aef Fix build: newTempFolder (#16170) No new revisions were added by this update. Summary of changes: .../src/test/java/org/apache/druid/msq/exec/MSQExportTest.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Fix build: newTempFolder (druid)
gianm merged PR #16170: URL: https://github.com/apache/druid/pull/16170 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Update Kubernetes doc for Helm Chart (druid)
gianm commented on code in PR #16045: URL: https://github.com/apache/druid/pull/16045#discussion_r1530603864 ## docs/operations/kubernetes.md: ## @@ -25,10 +25,20 @@ title: "kubernetes" Apache Druid distribution is also available as [Docker](https://www.docker.com/) image from [Docker Hub](https://hub.docker.com/r/apache/druid) . For example, you can obtain latest release using the command below. -``` +```bash $ docker pull apache/druid ``` -[druid-operator](https://github.com/datainfrahq/druid-operator) can be used to manage a Druid cluster on [Kubernetes](https://kubernetes.io/) . +[Apache Druid Helm Chart](https://github.com/asdf2014/druid-helm) can be used to deploy a Druid cluster on Kubernetes with following commands: Review Comment: When might someone want to use the operator and when might someone want to use the helm chart? Would it be clear to someone familiar with k8s what the right choice would be? I wonder about this whenever thinking about adding options to a documentation page-- we should try to help people choose an option, unless it would be obvious. (I don't have much personal experience with k8s so I am not sure if this is obvious or not.) ## docs/operations/kubernetes.md: ## @@ -25,10 +25,20 @@ title: "kubernetes" Apache Druid distribution is also available as [Docker](https://www.docker.com/) image from [Docker Hub](https://hub.docker.com/r/apache/druid) . For example, you can obtain latest release using the command below. -``` +```bash $ docker pull apache/druid ``` -[druid-operator](https://github.com/datainfrahq/druid-operator) can be used to manage a Druid cluster on [Kubernetes](https://kubernetes.io/) . +[Apache Druid Helm Chart](https://github.com/asdf2014/druid-helm) can be used to deploy a Druid cluster on Kubernetes with following commands: Review Comment: When might someone want to use the operator and when might someone want to use the helm chart? Would it be clear to someone familiar with k8s what the right choice would be? I wonder about this whenever thinking about adding options to a documentation page-- we should try to help people choose an option, unless it would be obvious. (I don't have much personal experience with k8s so I am not sure if the choice of helm chart vs operator is obvious or not.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Fix build: newTempFolder (druid)
kgyrtkirk opened a new pull request, #16170: URL: https://github.com/apache/druid/pull/16170 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch 29.0.1 updated: [Backport] MSQ: Validate that strings and string arrays are not mixed. (#15920) (#16160)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 29.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/29.0.1 by this push: new b1a8243cf39 [Backport] MSQ: Validate that strings and string arrays are not mixed. (#15920) (#16160) b1a8243cf39 is described below commit b1a8243cf390945d575d7a3acc0fb217af91c729 Author: Karan Kumar AuthorDate: Tue Mar 19 19:41:54 2024 +0530 [Backport] MSQ: Validate that strings and string arrays are not mixed. (#15920) (#16160) * Cherry-picking 15920-to-29.0.1 * Fixing extra test case which got added as part of merge - Co-authored-by: Gian Merlino --- docs/multi-stage-query/concepts.md | 4 +- docs/multi-stage-query/reference.md| 3 +- docs/querying/arrays.md| 62 - docs/querying/multi-value-dimensions.md| 4 +- .../org/apache/druid/msq/exec/ControllerImpl.java | 10 +- .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 274 - .../druid/msq/util/DimensionSchemaUtils.java | 130 ++ .../druid/msq/util/MultiStageQueryContext.java | 29 ++- .../org/apache/druid/msq/exec/MSQArraysTest.java | 187 +- .../org/apache/druid/msq/exec/MSQInsertTest.java | 2 +- .../druid/msq/test/CalciteMSQTestsHelper.java | 4 +- .../druid/msq/util/DimensionSchemaUtilsTest.java | 6 +- .../druid/msq/util/MultiStageQueryContextTest.java | 2 +- .../druid/sql/avatica/DruidAvaticaHandlerTest.java | 12 + .../druid/sql/calcite/CalciteArraysQueryTest.java | 210 +++- .../apache/druid/sql/calcite/CalciteQueryTest.java | 2 + .../druid/sql/calcite/util/CalciteTests.java | 1 + .../druid/sql/calcite/util/TestDataBuilder.java| 34 +++ 18 files changed, 650 insertions(+), 326 deletions(-) diff --git a/docs/multi-stage-query/concepts.md b/docs/multi-stage-query/concepts.md index 27b7d12c91c..cae88a0f375 100644 --- a/docs/multi-stage-query/concepts.md +++ b/docs/multi-stage-query/concepts.md @@ -200,8 +200,8 @@ To perform ingestion with rollup: 2. Set [`finalizeAggregations: false`](reference.md#context-parameters) in your context. This causes aggregation functions to write their internal state to the generated segments, instead of the finalized end result, and enables further aggregation at query time. -3. See [ARRAY types](../querying/arrays.md#sql-based-ingestion-with-rollup) for information about ingesting `ARRAY` columns -4. See [multi-value dimensions](../querying/multi-value-dimensions.md#sql-based-ingestion-with-rollup) for information to ingest multi-value VARCHAR columns +3. See [ARRAY types](../querying/arrays.md#sql-based-ingestion) for information about ingesting `ARRAY` columns +4. See [multi-value dimensions](../querying/multi-value-dimensions.md#sql-based-ingestion) for information to ingest multi-value VARCHAR columns When you do all of these things, Druid understands that you intend to do an ingestion with rollup, and it writes rollup-related metadata into the generated segments. Other applications can then use [`segmentMetadata` diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 25f55b31f74..a56fae0ab2f 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -325,7 +325,7 @@ The following table lists the context parameters for the MSQ task engine: | `maxNumTasks` | SELECT, INSERT, REPLACEThe maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority. | 2 | | `taskAssignment` | SELECT, INSERT, REPLACEDetermines how many tasks to use. Possible values include: `max`: Uses as many tasks as possible, up to `maxNumTasks`.`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 512 MiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When calculating the size of files, [...] | `finalizeAggregations` | SELECT, INSERT, REPLACEDetermines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggr [...] -|
Re: [PR] [Backport] MSQ: Validate that strings and string arrays are not mixed. (#15920) (druid)
cryptoe merged PR #16160: URL: https://github.com/apache/druid/pull/16160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] [Backport] Adding null check to earliest and latest aggs (druid)
cryptoe commented on PR #16164: URL: https://github.com/apache/druid/pull/16164#issuecomment-2007133209 The http emitter is due to a flake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] [Backport] Fix FloatFirstVectorAggregationTest (druid)
cryptoe merged PR #16169: URL: https://github.com/apache/druid/pull/16169 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch 29.0.1 updated: Fix FloatFirstVectorAggregationTest (#16165) (#16169)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 29.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/29.0.1 by this push: new efe5ec06162 Fix FloatFirstVectorAggregationTest (#16165) (#16169) efe5ec06162 is described below commit efe5ec06162aaa6ed862f0dd227aae4a0aa220ae Author: Karan Kumar AuthorDate: Tue Mar 19 18:00:44 2024 +0530 Fix FloatFirstVectorAggregationTest (#16165) (#16169) Co-authored-by: Laksh Singla --- .../query/aggregation/first/FloatFirstVectorAggregationTest.java | 8 +++- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index 28044de5e4e..a95e89bd32a 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -64,9 +64,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest new SerializablePairLongFloat(2345300L, 4.2F) }; - private final SerializablePairLongFloat[] null_pairs = { - null, null, null, null - }; + private final SerializablePairLongFloat[] nullPairs = {null, null, null, null}; @@ -130,7 +128,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest @Override public Object[] getObjectVector() { -return null_pairs; +return nullPairs; } @Override @@ -287,7 +285,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest target1.aggregate(buf, 0, 0, VALUES.length); Pair result = (Pair) target1.get(buf, 0); Assert.assertEquals(Long.MAX_VALUE, result.lhs.longValue()); -Assert.assertNull(result.rhs); +Assert.assertEquals(NullHandling.defaultFloatValue(), result.rhs); } @Test - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] [Backport] Fix FloatFirstVectorAggregationTest (druid)
cryptoe commented on PR #16169: URL: https://github.com/apache/druid/pull/16169#issuecomment-2007054921 Only test case fix. Merging this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] [Backport] Fix FloatFirstVectorAggregationTest (druid)
cryptoe opened a new pull request, #16169: URL: https://github.com/apache/druid/pull/16169 Backport of #16165 to 29.0.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch 29.0.1 updated: Adding null check to earliest and latest aggs (#15972) (#16164)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 29.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/29.0.1 by this push: new ced9e63969e Adding null check to earliest and latest aggs (#15972) (#16164) ced9e63969e is described below commit ced9e63969eaab223e75aec66400df62593cf131 Author: Karan Kumar AuthorDate: Tue Mar 19 17:58:22 2024 +0530 Adding null check to earliest and latest aggs (#15972) (#16164) * Adding null check to earliest and latest aggs * Native tests for null inPairs Co-authored-by: Soumyava <93540295+somu-im...@users.noreply.github.com> --- .../first/NumericFirstVectorAggregator.java| 2 +- .../last/NumericLastVectorAggregator.java | 2 +- .../first/FloatFirstVectorAggregationTest.java | 38 ++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java index b487ac817d5..6f6bfec014c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstVectorAggregator.java @@ -95,7 +95,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator if (objectsWhichMightBeNumeric != null) { final SerializablePair inPair = (SerializablePair) objectsWhichMightBeNumeric[index]; -if (inPair.lhs != null && inPair.lhs < firstTime) { +if (inPair != null && inPair.lhs != null && inPair.lhs < firstTime) { firstTime = inPair.lhs; if (useDefault || inPair.rhs != null) { updateTimeWithValue(buf, position, firstTime, index); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastVectorAggregator.java index ff5fa3af9e9..a556c3fea18 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastVectorAggregator.java @@ -96,7 +96,7 @@ public abstract class NumericLastVectorAggregator implements VectorAggregator if (objectsWhichMightBeNumeric != null) { final SerializablePair inPair = (SerializablePair) objectsWhichMightBeNumeric[index]; - if (inPair.lhs != null && inPair.lhs >= lastTime) { + if (inPair != null && inPair.lhs != null && inPair.lhs >= lastTime) { lastTime = inPair.lhs; if (useDefault || inPair.rhs != null) { updateTimeWithValue(buf, position, lastTime, index); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index c7e8210d6c6..28044de5e4e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -64,12 +64,18 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest new SerializablePairLongFloat(2345300L, 4.2F) }; + private final SerializablePairLongFloat[] null_pairs = { + null, null, null, null + }; + private VectorObjectSelector selector; + private VectorObjectSelector selector1; private BaseLongVectorValueSelector timeSelector; private ByteBuffer buf; private FloatFirstVectorAggregator target; + private FloatFirstVectorAggregator target1; private FloatFirstAggregatorFactory floatFirstAggregatorFactory; private VectorColumnSelectorFactory selectorFactory; @@ -119,6 +125,27 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest } }; +selector1 = new VectorObjectSelector() +{ + @Override + public Object[] getObjectVector() + { +return null_pairs; + } + + @Override + public int getMaxVectorSize() + { +return 4; + } + + @Override + public int getCurrentVectorSize() + { +return 0; + } +}; + nonFloatValueSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset( LONG_VALUES.length, 0, @@ -219,6 +246,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest }; target = new FloatFirstVectorAggregator(timeSelector, selector); +target1 = new FloatFirstVectorAggregator(timeSelector, selector1); clearBufferForPositions(0, 0);
Re: [PR] [Backport] Adding null check to earliest and latest aggs (druid)
cryptoe merged PR #16164: URL: https://github.com/apache/druid/pull/16164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch 29.0.1 updated: Fix incorrect header names for certain export queries (#16096) (#16167)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 29.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/29.0.1 by this push: new e1dc60f21d7 Fix incorrect header names for certain export queries (#16096) (#16167) e1dc60f21d7 is described below commit e1dc60f21d7d9ec502c1f256ac27b5e6abd92792 Author: Adarsh Sanjeev AuthorDate: Tue Mar 19 17:46:42 2024 +0530 Fix incorrect header names for certain export queries (#16096) (#16167) * Fix incorrect header names for certain queries * Fix incorrect header names for certain queries * Maintain upgrade compatibility * Fix tests * Change null handling --- .../org/apache/druid/msq/exec/ControllerImpl.java | 3 +- .../results/ExportResultsFrameProcessor.java | 52 +++--- .../ExportResultsFrameProcessorFactory.java| 18 +++- .../org/apache/druid/msq/exec/MSQExportTest.java | 108 +++-- .../ExportResultsFrameProcessorFactoryTest.java| 52 ++ 5 files changed, 183 insertions(+), 50 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 5da74f0a52a..bb3c4369527 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1913,7 +1913,8 @@ public class ControllerImpl implements Controller .processorFactory(new ExportResultsFrameProcessorFactory( queryId, exportStorageProvider, - resultFormat + resultFormat, + columnMappings )) ); return builder.build(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index de65d3e9d7a..52697578b07 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.results; import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -35,13 +37,14 @@ import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.util.SequenceUtils; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.StorageConnector; @@ -60,6 +63,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor private final ObjectMapper jsonMapper; private final ChannelCounters channelCounter; final String exportFilePath; + private final Object2IntMap outputColumnNameToFrameColumnNumberMap; + private final RowSignature exportRowSignature; public ExportResultsFrameProcessor( final ReadableFrameChannel inputChannel, @@ -68,7 +73,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor final StorageConnector storageConnector, final ObjectMapper jsonMapper, final ChannelCounters channelCounter, - final String exportFilePath + final String exportFilePath, + final ColumnMappings columnMappings ) { this.inputChannel = inputChannel; @@ -78,6 +84,30 @@ public class ExportResultsFrameProcessor implements FrameProcessor this.jsonMapper = jsonMapper; this.channelCounter = channelCounter; this.exportFilePath = exportFilePath; +this.outputColumnNameToFrameColumnNumberMap = new
Re: [PR] [Backport] Fix incorrect header names for certain export queries (#16096) (druid)
cryptoe merged PR #16167: URL: https://github.com/apache/druid/pull/16167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: Fix FloatFirstVectorAggregationTest (#16165)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new e3b75ac11f5 Fix FloatFirstVectorAggregationTest (#16165) e3b75ac11f5 is described below commit e3b75ac11f5792fe6c633a85c447f54b2bb36d4d Author: Laksh Singla AuthorDate: Tue Mar 19 17:42:29 2024 +0530 Fix FloatFirstVectorAggregationTest (#16165) --- .../query/aggregation/first/FloatFirstVectorAggregationTest.java | 8 +++- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index 28044de5e4e..a95e89bd32a 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -64,9 +64,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest new SerializablePairLongFloat(2345300L, 4.2F) }; - private final SerializablePairLongFloat[] null_pairs = { - null, null, null, null - }; + private final SerializablePairLongFloat[] nullPairs = {null, null, null, null}; @@ -130,7 +128,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest @Override public Object[] getObjectVector() { -return null_pairs; +return nullPairs; } @Override @@ -287,7 +285,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest target1.aggregate(buf, 0, 0, VALUES.length); Pair result = (Pair) target1.get(buf, 0); Assert.assertEquals(Long.MAX_VALUE, result.lhs.longValue()); -Assert.assertNull(result.rhs); +Assert.assertEquals(NullHandling.defaultFloatValue(), result.rhs); } @Test - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Fix FloatFirstVectorAggregationTest (druid)
cryptoe merged PR #16165: URL: https://github.com/apache/druid/pull/16165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] feat: support iterable for druid lookups-cached-single extension (druid)
TessaIO commented on PR #16124: URL: https://github.com/apache/druid/pull/16124#issuecomment-2006959133 ping @cryptoe. Can you also drop a quick review on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Update Calcite*Test to use junit5 (druid)
gianm merged PR #16106: URL: https://github.com/apache/druid/pull/16106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MSQ controller: Support in-memory shuffles; towards JVM reuse. (druid)
github-advanced-security[bot] commented on code in PR #16168: URL: https://github.com/apache/druid/pull/16168#discussion_r1530125108 ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java: ## @@ -53,32 +71,33 @@ DruidNode selfNode(); /** - * Provide access to the Coordinator service. + * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} into {@link SegmentsInputSlice}. */ - CoordinatorClient coordinatorClient(); + InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager); Review Comment: ## Useless parameter The parameter 'workerManager' is never used. [Show more details](https://github.com/apache/druid/security/code-scanning/7149) ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/ControllerResource.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.rpc; + +import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.msq.counters.CounterSnapshots; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.exec.Controller; +import org.apache.druid.msq.exec.ControllerClient; +import org.apache.druid.msq.indexing.MSQTaskList; +import org.apache.druid.msq.indexing.error.MSQErrorReport; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation; +import org.apache.druid.server.security.AuthorizerMapper; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.util.List; +import java.util.Map; + +public class ControllerResource +{ + private final Controller controller; + private final ResourcePermissionMapper permissionMapper; + private final AuthorizerMapper authorizerMapper; + + public ControllerResource( + final Controller controller, + final ResourcePermissionMapper permissionMapper, + final AuthorizerMapper authorizerMapper + ) + { +this.controller = controller; +this.permissionMapper = permissionMapper; +this.authorizerMapper = authorizerMapper; + } + + /** + * Used by subtasks to post {@link PartialKeyStatisticsInformation} for shuffling stages. + * + * See {@link ControllerClient#postPartialKeyStatistics(StageId, int, PartialKeyStatisticsInformation)} + * for the client-side code that calls this API. + */ + @POST + @Path("/partialKeyStatisticsInformation/{queryId}/{stageNumber}/{workerNumber}") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public Response httpPostPartialKeyStatistics( + final Object partialKeyStatisticsObject, + @PathParam("queryId") final String queryId, + @PathParam("stageNumber") final int stageNumber, + @PathParam("workerNumber") final int workerNumber, + @Context final HttpServletRequest req + ) + { +MSQResourceUtils.authorizeAdminRequest(permissionMapper, authorizerMapper, req); +controller.updatePartialKeyStatisticsInformation(stageNumber, workerNumber, partialKeyStatisticsObject); +return Response.status(Response.Status.ACCEPTED).build(); + } + + /** + * Used by subtasks to post system errors. Note that the errors are organized by taskId, not by query/stage/worker, + * because system errors are associated with a task rather than a specific query/stage/worker execution context. + * + * See {@link ControllerClient#postWorkerError} for the client-side code that calls this API. + */ + @POST + @Path("/workerError/{taskId}") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public Response httpPostWorkerError( + final MSQErrorReport errorReport, + @PathParam("taskId") final String taskId, Review Comment: ## Useless parameter The parameter 'taskId' is never used. [Show more
Re: [PR] [Backport] Adding null check to earliest and latest aggs (druid)
cryptoe commented on PR #16164: URL: https://github.com/apache/druid/pull/16164#issuecomment-2006697728 @LakshSingla The fix #16165 would not fix the issue the CI issue in this PR : ``` Error: Failures: Error: org.apache.druid.java.util.emitter.core.HttpEmitterTest.timeoutEmptyQueue Error:Run 1: HttpEmitterTest.timeoutEmptyQueue:82 Expected: a value less than <0.0> but: <2.0> was greater than <0.0> Error:Run 2: HttpEmitterTest.timeoutEmptyQueue:82 Expected: a value less than <0.0> but: <2.0> was greater than <0.0> Error:Run 3: HttpEmitterTest.timeoutEmptyQueue:82 Expected: a value less than <0.0> but: <2.0> was greater than <0.0> Error:Run 4: HttpEmitterTest.timeoutEmptyQueue:82 Expected: a value less than <0.0> but: <2.0> was greater than <0.0> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [I] Window functions on MSQ for high and low cardinality data (druid)
cryptoe commented on issue #16126: URL: https://github.com/apache/druid/issues/16126#issuecomment-2006686677 Base PR : https://github.com/apache/druid/pull/15470 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] [Backport] Fix incorrect header names for certain export queries (#16096) (druid)
cryptoe commented on PR #16167: URL: https://github.com/apache/druid/pull/16167#issuecomment-2006649160 Backport of https://github.com/apache/druid/pull/16096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch 29.0.1 updated: [Backport] Handling latest_by and earliest_by on numeric columns correctly #15939
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch 29.0.1 in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/29.0.1 by this push: new 44d0d3245c0 [Backport] Handling latest_by and earliest_by on numeric columns correctly #15939 44d0d3245c0 is described below commit 44d0d3245c063e3c9d7a3805632fa1295143b115 Author: Karan Kumar AuthorDate: Tue Mar 19 15:31:49 2024 +0530 [Backport] Handling latest_by and earliest_by on numeric columns correctly #15939 Co-authored-by: Soumyava <93540295+somu-im...@users.noreply.github.com> --- .../aggregation/first/NumericFirstAggregator.java | 8 +- .../first/NumericFirstBufferAggregator.java| 8 +- .../aggregation/last/NumericLastAggregator.java| 9 +- .../last/NumericLastBufferAggregator.java | 8 +- .../apache/druid/sql/calcite/CalciteQueryTest.java | 508 +++-- 5 files changed, 286 insertions(+), 255 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java index b3092377b57..6b32996b4f2 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java @@ -62,10 +62,6 @@ public abstract class NumericFirstAggregator implements Aggregator @Override public void aggregate() { -if (timeSelector.isNull()) { - return; -} - if (needsFoldCheck) { final Object object = valueSelector.getObject(); if (object instanceof SerializablePair) { @@ -84,6 +80,10 @@ public abstract class NumericFirstAggregator implements Aggregator } } +if (timeSelector.isNull()) { + return; +} + long time = timeSelector.getLong(); if (time < firstTime) { firstTime = time; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java index 4531ee71bcd..f20456d3122 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java @@ -97,10 +97,6 @@ public abstract class NumericFirstBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { -if (timeSelector.isNull()) { - return; -} - long firstTime = buf.getLong(position); if (needsFoldCheck) { final Object object = valueSelector.getObject(); @@ -117,6 +113,10 @@ public abstract class NumericFirstBufferAggregator implements BufferAggregator } } +if (timeSelector.isNull()) { + return; +} + long time = timeSelector.getLong(); if (time < firstTime) { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java index 159939450ee..50d4470fa54 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java @@ -61,10 +61,6 @@ public abstract class NumericLastAggregator implements Aggregator @Override public void aggregate() { -if (timeSelector.isNull()) { - return; -} - if (needsFoldCheck) { final Object object = valueSelector.getObject(); if (object instanceof SerializablePair) { @@ -83,6 +79,11 @@ public abstract class NumericLastAggregator implements Aggregator return; } } + +if (timeSelector.isNull()) { + return; +} + long time = timeSelector.getLong(); if (time >= lastTime) { lastTime = time; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java index 9de6f996887..2ba15a7929d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java @@ -100,10 +100,6 @@ public abstract class NumericLastBufferAggregator implements BufferAggregator @Override public void aggregate(ByteBuffer buf, int position) { -if (timeSelector.isNull()) { - return; -} - long lastTime = buf.getLong(position); if (needsFoldCheck) { final Object object = valueSelector.getObject(); @@
Re: [PR] [Backport] Handling latest_by and earliest_by on numeric columns correctly #15939 (druid)
cryptoe merged PR #16163: URL: https://github.com/apache/druid/pull/16163 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Record column name for exceptions while writing frames in RowBasedFrameWriter (druid)
cryptoe commented on code in PR #16130: URL: https://github.com/apache/druid/pull/16130#discussion_r1530055165 ## processing/src/main/java/org/apache/druid/frame/write/FrameFieldWriterException.java: ## Review Comment: I think we can make a MSQ fault out of this. That way we donot have to adjust any retry logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Record column name for exceptions while writing frames in RowBasedFrameWriter (druid)
cryptoe commented on code in PR #16130: URL: https://github.com/apache/druid/pull/16130#discussion_r1530054071 ## processing/src/main/java/org/apache/druid/frame/write/InvalidNullByteException.java: ## @@ -53,17 +50,16 @@ private InvalidNullByteException( @Nullable final Integer position ) { -super(StringUtils.format( +super(column, StringUtils.format( Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] MSQ controller: Support in-memory shuffles; towards JVM reuse. (druid)
gianm opened a new pull request, #16168: URL: https://github.com/apache/druid/pull/16168 This patch contains two controller changes that make progress towards a lower-latency MSQ. These are both larger changes, but I developed them together and it was most straightforward to put them into a single PR rather than separating into multiple PRs, especially given they both involved changes in common files like ControllerImpl and ControllerContext. **Key classes:** These files are the most interesting IMO. - Controller - ControllerImpl - ControllerQueryKernel - ControllerQueryKernelConfig - ControllerUtils (especially computeStageGroups) - ControllerContext - MSQWorkerTaskLauncher **Key changes:** First, support for **in-memory shuffles**. The main feature of in-memory shuffles, as far as the controller is concerned, is that they are not fully buffered. That means that whenever a producer stage uses in-memory output, its consumer must run concurrently. The controller determines which stages run concurrently, and when they start and stop. "Leapfrogging" allows any chain of sort-based stages to use in-memory shuffles even if we can only run two stages at once. For example, in a linear chain of stages 0 -> 1 -> 2 where all do sort-based shuffles, we can use in-memory shuffling for each one while only running two at once. (When stage 1 is done reading input and about to start writing its output, we can stop 0 and start 2.) 1) New `OutputChannelMode` enum attached to `WorkOrder` that tells workers whether stage output should be in memory (`MEMORY`), or use local or durable storage. 2) New logic in the `ControllerQueryKernel` to determine which stages can use in-memory shuffling (`ControllerUtils#computeStageGroups`) and to launch them at the appropriate time (`ControllerQueryKernel#createNewKernels`). 3) New `doneReadingInput` method on `Controller` (passed down to the stage kernels) which allows stages to transition to `POST_READING` even if they are not gathering statistics. This is important because it enables "leapfrogging" for `HASH_LOCAL_SORT` shuffles, and for `GLOBAL_SORT` shuffles with 1 partition. 4) Moved result-reading from `ControllerContext#writeReports` to new `QueryListener` interface, which `ControllerImpl` feeds results to row-by-row while the query is still running. Important so we can read query results from the final stage using an in-memory channel. 5) New class `ControllerQueryKernelConfig` holds configs that control kernel behavior (such as whether to pipeline, maximum number of concurrent stages, etc). Generated by the `ControllerContext`. Second, a refactor **towards running workers in persistent JVMs** that are able to cache data across queries. This is helpful because I believe we'll want to reuse JVMs and cached data for latency reasons. 1) Move creation of `WorkerManager` and `TableInputSpecSlicer` to the `ControllerContext`, rather than `ControllerImpl`. This allows managing workers and work assignment differently when JVMs are reusable. 2) Lift the Controller Jersey resource out from `ControllerChatHandler` to a reusable resource `ControllerResource`. 3) Move memory introspection to a `MemoryIntrospector` interface, and introduce `ControllerMemoryParameters` that uses it. This makes it easier to run MSQ in process types other than Indexer and Peon. Both of these areas will have follow-ups that make similar changes on the worker side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] [Backport] Fix incorrect header names for certain export queries (#16096) (druid)
adarshsanjeev opened a new pull request, #16167: URL: https://github.com/apache/druid/pull/16167 Backport of https://github.com/apache/druid/commit/a151bcfd12e86b3ace7a9776cb7bc1e87ea2a017 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[I] Lookup is not found when using as a part of filtered aggregation (druid)
irinaJT opened a new issue, #16166: URL: https://github.com/apache/druid/issues/16166 Lookup is not found when using as a part of filtered aggregation ### Affected Version 28.0.1 ### Description Some queries which did work in the earlier druid version (26.0.0) throw and error in the 28th. It happens when querying for the aggregation which uses lookup as a filter. This is the example of problematic aggregation ``` { "type": "filtered", "filter": { "type": "selector", "dimension": "country_id", "value": "FR", "extractionFn": { "type": "cascade", "extractionFns": [ { "type": "registeredLookup", "lookup": "countries_iso2" }, { "type": "lower" } ] } }, "aggregator": { "type": "doubleSum", "name": "total", "fieldName": "total" } } ``` And error is ``` Error: undefined Cannot construct instance of `org.apache.druid.query.aggregation.FilteredAggregatorFactory`, problem: Lookup [countries_iso2] not found at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 1343] (through reference chain: org.apache.druid.query.timeseries.TimeseriesQuery["aggregations"]->java.util.ArrayList[1]) com.fasterxml.jackson.databind.exc.ValueInstantiationException ``` But lookup is present and if I use the same filter as a part of query filter, it does work without any issues. Not sure, if it is a configuration issue (though, didn't find anything in the docs related to the limitations of using lookups for filtered aggregation) or a bug in the latest updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: Fix incorrect header names for certain export queries (#16096)
This is an automated email from the ASF dual-hosted git repository. adarshsanjeev pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new a151bcfd12e Fix incorrect header names for certain export queries (#16096) a151bcfd12e is described below commit a151bcfd12e86b3ace7a9776cb7bc1e87ea2a017 Author: Adarsh Sanjeev AuthorDate: Tue Mar 19 15:11:04 2024 +0530 Fix incorrect header names for certain export queries (#16096) * Fix incorrect header names for certain queries * Fix incorrect header names for certain queries * Maintain upgrade compatibility * Fix tests * Change null handling --- .../org/apache/druid/msq/exec/ControllerImpl.java | 3 +- .../results/ExportResultsFrameProcessor.java | 52 +++--- .../ExportResultsFrameProcessorFactory.java| 18 +++- .../org/apache/druid/msq/exec/MSQExportTest.java | 108 +++-- .../ExportResultsFrameProcessorFactoryTest.java| 52 ++ 5 files changed, 183 insertions(+), 50 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c71941f5c07..e9d71239940 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1905,7 +1905,8 @@ public class ControllerImpl implements Controller .processorFactory(new ExportResultsFrameProcessorFactory( queryId, exportStorageProvider, - resultFormat + resultFormat, + columnMappings )) ); return builder.build(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index de65d3e9d7a..52697578b07 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.results; import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.ints.IntSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -35,13 +37,14 @@ import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.querykit.QueryKitUtils; import org.apache.druid.msq.util.SequenceUtils; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.planner.ColumnMapping; +import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.StorageConnector; @@ -60,6 +63,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor private final ObjectMapper jsonMapper; private final ChannelCounters channelCounter; final String exportFilePath; + private final Object2IntMap outputColumnNameToFrameColumnNumberMap; + private final RowSignature exportRowSignature; public ExportResultsFrameProcessor( final ReadableFrameChannel inputChannel, @@ -68,7 +73,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor final StorageConnector storageConnector, final ObjectMapper jsonMapper, final ChannelCounters channelCounter, - final String exportFilePath + final String exportFilePath, + final ColumnMappings columnMappings ) { this.inputChannel = inputChannel; @@ -78,6 +84,30 @@ public class ExportResultsFrameProcessor implements FrameProcessor this.jsonMapper = jsonMapper; this.channelCounter = channelCounter; this.exportFilePath = exportFilePath; +this.outputColumnNameToFrameColumnNumberMap = new
Re: [PR] Fix incorrect header names for certain export queries (druid)
adarshsanjeev merged PR #16096: URL: https://github.com/apache/druid/pull/16096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
findingrish commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1530020329 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE ); } + private void doPollSegmentAndSchema() + { +log.debug("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSegmentSchemaPoll == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload, created_date FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload, created_date FROM %1$s where created_date > '%2$s'", Review Comment: I think I can update the query to filter on the auto-increment `id` column to fix this problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] [Backport] Adding null check to earliest and latest aggs (druid)
LakshSingla commented on PR #16164: URL: https://github.com/apache/druid/pull/16164#issuecomment-2006436394 https://github.com/apache/druid/pull/16165 needs to be backported as well, to fix the broken test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] Fix FloatFirstVectorAggregationTest (druid)
LakshSingla opened a new pull request, #16165: URL: https://github.com/apache/druid/pull/16165 Fixes the broken `FloatFirstVectorAggregationTest` in the master branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: Revert "Fix FloatFirstVectorAggregationTest"
This is an automated email from the ASF dual-hosted git repository. lakshsingla pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new f5c5573da89 Revert "Fix FloatFirstVectorAggregationTest" f5c5573da89 is described below commit f5c5573da89a924875e209dc14b5ac7019e0822f Author: Laksh Singla AuthorDate: Tue Mar 19 14:39:09 2024 +0530 Revert "Fix FloatFirstVectorAggregationTest" This reverts commit c40bb2c8f071043a87ede54d9dc56d024774ccfd. --- .../query/aggregation/first/FloatFirstVectorAggregationTest.java | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java index a95e89bd32a..28044de5e4e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstVectorAggregationTest.java @@ -64,7 +64,9 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest new SerializablePairLongFloat(2345300L, 4.2F) }; - private final SerializablePairLongFloat[] nullPairs = {null, null, null, null}; + private final SerializablePairLongFloat[] null_pairs = { + null, null, null, null + }; @@ -128,7 +130,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest @Override public Object[] getObjectVector() { -return nullPairs; +return null_pairs; } @Override @@ -285,7 +287,7 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest target1.aggregate(buf, 0, 0, VALUES.length); Pair result = (Pair) target1.get(buf, 0); Assert.assertEquals(Long.MAX_VALUE, result.lhs.longValue()); -Assert.assertEquals(NullHandling.defaultFloatValue(), result.rhs); +Assert.assertNull(result.rhs); } @Test - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
findingrish commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1529975659 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE ); } + private void doPollSegmentAndSchema() + { +log.debug("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSegmentSchemaPoll == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload, created_date FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload, created_date FROM %1$s where created_date > '%2$s'", Review Comment: Good point, it is possible that the poll could miss schema created at the same time. Actually, it is possible that the poll could miss schema created earlier as well. Let's say a transaction with timestamp t < created_date failed, and is still retrying while the poll happened, it is possible that we will miss those schema in the next poll even if the poll condition is `greater than equal to`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [I] Druid prometheus-emitter does not emit all the metrics (druid)
tapojit047 commented on issue #16148: URL: https://github.com/apache/druid/issues/16148#issuecomment-2006401443 So, I need to add the appropriate metrics in here. Right? @suneet-s https://github.com/apache/druid/blob/master/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated (55c47fbcfd9 -> c40bb2c8f07)
This is an automated email from the ASF dual-hosted git repository. lakshsingla pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/druid.git from 55c47fbcfd9 MSQ: Fix NPE in getWorkerStats(). (#16159) add c40bb2c8f07 Fix FloatFirstVectorAggregationTest No new revisions were added by this update. Summary of changes: .../query/aggregation/first/FloatFirstVectorAggregationTest.java | 8 +++- 1 file changed, 3 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
findingrish commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1529964957 ## server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java: ## @@ -2245,29 +2376,41 @@ private void insertIntoUpgradeSegmentsTable( } } - private List retrieveSegmentsById(Handle handle, String datasource, Set segmentIds) + private List retrieveSegmentsById(Handle handle, String datasource, Set segmentIds) { if (segmentIds.isEmpty()) { return Collections.emptyList(); } return SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveSegmentsById(datasource, segmentIds) + .retrieveSegmentsById(datasource, segmentIds, centralizedDatasourceSchemaConfig.isEnabled()) .stream() .map(DataSegmentPlus::getDataSegment) +.map(v -> new DataSegmentWithSchemaInformation(v, null, null)) Review Comment: Thanks for pointing it out. It is a bug, probably got introduced while merging with master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MSQ: Fix NPE in getWorkerStats(). (druid)
cryptoe merged PR #16159: URL: https://github.com/apache/druid/pull/16159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated (8ee324c7e73 -> 55c47fbcfd9)
This is an automated email from the ASF dual-hosted git repository. karan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/druid.git from 8ee324c7e73 MSQ: Cancel workers more quickly. (#16158) add 55c47fbcfd9 MSQ: Fix NPE in getWorkerStats(). (#16159) No new revisions were added by this update. Summary of changes: .../druid/msq/indexing/MSQWorkerTaskLauncher.java | 21 + 1 file changed, 9 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] MSQ: Cancel workers more quickly. (druid)
cryptoe merged PR #16158: URL: https://github.com/apache/druid/pull/16158 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
(druid) branch master updated: MSQ: Cancel workers more quickly. (#16158)
This is an automated email from the ASF dual-hosted git repository. karan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git The following commit(s) were added to refs/heads/master by this push: new 8ee324c7e73 MSQ: Cancel workers more quickly. (#16158) 8ee324c7e73 is described below commit 8ee324c7e73ef6c64c55f0b2b93bf3e198528234 Author: Gian Merlino AuthorDate: Tue Mar 19 01:51:22 2024 -0700 MSQ: Cancel workers more quickly. (#16158) Prior to this patch, when canceled, workers would keep trying to contact the controller: they would attempt to report an error, and if they were in the midst of some other call (like a counters push) they would keep trying it. This can cause cancellation to be delayed, because the controller shuts down its HTTP server before it cancels workers. Workers are then stuck retrying calls to the controller that will never succeed. The retry loops are broken when the controller gives up on them (one minute later) and exits for real. Then, the controller failure detection logic on the worker detects that the controller has failed, and the worker finally shuts down. This patch speeds up worker cancellation by bypassing communication with the controller. There is no real need for it. If the controller canceled the workers, it isn't interested in further communications from them. If the workers were canceled out-of-band, the controller can detect this through worker monitoring and report it as a WorkerFailed error. --- .../apache/druid/msq/exec/ControllerClient.java| 4 +++ .../java/org/apache/druid/msq/exec/WorkerImpl.java | 39 +- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java index b3675f0e047..afd1ece4dad 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerClient.java @@ -73,8 +73,12 @@ public interface ControllerClient extends AutoCloseable void postWorkerWarning( List MSQErrorReports ) throws IOException; + List getTaskList() throws IOException; + /** + * Close this client. Idempotent. + */ @Override void close(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 3f2ef39b5bf..e76169f7042 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -498,20 +498,16 @@ public class WorkerImpl implements Worker @Override public void stopGracefully() { -log.info("Stopping gracefully for taskId [%s]", task.getId()); -kernelManipulationQueue.add( -kernel -> { - // stopGracefully() is called when the containing process is terminated, or when the task is canceled. - throw new MSQException(CanceledFault.INSTANCE); -} -); +// stopGracefully() is called when the containing process is terminated, or when the task is canceled. +log.info("Worker task[%s] canceled.", task.getId()); +doCancel(); } @Override public void controllerFailed() { -controllerAlive = false; -stopGracefully(); +log.info("Controller task[%s] for worker task[%s] failed. Canceling.", task.getControllerTaskId(), task.getId()); +doCancel(); } @Override @@ -909,6 +905,31 @@ public class WorkerImpl implements Worker } } + /** + * Called by {@link #stopGracefully()} (task canceled, or containing process shut down) and + * {@link #controllerFailed()}. + */ + private void doCancel() + { +// Set controllerAlive = false so we don't try to contact the controller after being canceled. If it canceled us, +// it doesn't need to know that we were canceled. If we were canceled by something else, the controller will +// detect this as part of its monitoring of workers. +controllerAlive = false; + +// Close controller client to cancel any currently in-flight calls to the controller. +if (controllerClient != null) { + controllerClient.close(); +} + +// Clear the main loop event queue, then throw a CanceledFault into the loop to exit it promptly. +kernelManipulationQueue.clear(); +kernelManipulationQueue.add( +kernel -> { + throw new MSQException(CanceledFault.INSTANCE); +} +); + } + /** * Log (at DEBUG level) a string explaining the status of all work assigned to this worker. */
Re: [PR] Window function on msq (druid)
cryptoe commented on code in PR #15470: URL: https://github.com/apache/druid/pull/15470#discussion_r1529890568 ## processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.read.columnar.FrameColumnReaders; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + private final Frame frame; + private final RowSignature signature; + private final LinkedHashMap colCache = new LinkedHashMap<>(); + + public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) + { +this.frame = FrameType.COLUMNAR.ensureType(frame); +this.signature = signature; + } + + @Override + public Collection getColumnNames() + { +return signature.getColumnNames(); + } + + @Override + public int numRows() + { +return frame.numRows(); + } + + @Nullable + @Override + public Column findColumn(String name) + { +// Use contains so that we can negative cache. +if (!colCache.containsKey(name)) { + final int columnIndex = signature.indexOf(name); + if (columnIndex < 0) { +colCache.put(name, null); + } else { +final ColumnType columnType = signature +.getColumnType(columnIndex) +.orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + +colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); + } +} +return colCache.get(name); + + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { +if (StorageAdapter.class.equals(clazz)) { + return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); +} +if (WireTransferable.class.equals(clazz)) { Review Comment: Why would this be wire transferable. I am asking since I am unfamiliar with this space. ## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java: ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.frame.key.ClusterBy; +import org.apache.druid.frame.key.KeyColumn; +import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.msq.input.stage.StageInputSpec; +import
[PR] [Backport] Adding null check to earliest and latest aggs (druid)
cryptoe opened a new pull request, #16164: URL: https://github.com/apache/druid/pull/16164 Backport of #15972 to 29.0.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
[PR] [Backport] Handling latest_by and earliest_by on numeric columns correctly #15939 (druid)
cryptoe opened a new pull request, #16163: URL: https://github.com/apache/druid/pull/16163 Handling latest_by and earliest_by on numeric columns correctly #15939 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
AmatyaAvadhanula commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1529820832 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE ); } + private void doPollSegmentAndSchema() + { +log.debug("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSegmentSchemaPoll == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload, created_date FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload, created_date FROM %1$s where created_date > '%2$s'", Review Comment: Can distinct schemas not have the same created_date? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Associate pending segments with the tasks that requested them (druid)
AmatyaAvadhanula commented on code in PR #16144: URL: https://github.com/apache/druid/pull/16144#discussion_r1529818253 ## indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java: ## @@ -387,7 +399,7 @@ public LockResult tryLock(final Task task, final LockRequest request) if (request instanceof LockRequestForNewSegment) { final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request; if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) { - newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion()); + newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null); Review Comment: I suppose we could. My understanding of segment locking is limited which is why I wanted to leave the behaviour intact if possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
findingrish commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1529817709 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE ); } + private void doPollSegmentAndSchema() + { +log.debug("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSegmentSchemaPoll == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload, created_date FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload, created_date FROM %1$s where created_date > '%2$s'", + getSegmentSchemaTable(), + latestSegmentSchemaPoll.toString()); +} +String finalSchemaPollQuery = schemaPollQuery; +final DateTime[] maxCreatedDate = {latestSegmentSchemaPoll}; Review Comment: Using `AtomicReference` instead. This is being updated in lambda. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building (druid)
findingrish commented on code in PR #15817: URL: https://github.com/apache/druid/pull/15817#discussion_r1529817255 ## server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java: ## @@ -1098,6 +1129,146 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE ); } + private void doPollSegmentAndSchema() + { +log.debug("Starting polling of segment and schema table"); + +ConcurrentMap segmentStats = new ConcurrentHashMap<>(); + +// some databases such as PostgreSQL require auto-commit turned off +// to stream results back, enabling transactions disables auto-commit +// +// setting connection to read-only will allow some database such as MySQL +// to automatically use read-only transaction mode, further optimizing the query +final List segments = connector.inReadOnlyTransaction( +new TransactionCallback>() +{ + @Override + public List inTransaction(Handle handle, TransactionStatus status) + { +return handle +.createQuery(StringUtils.format("SELECT payload, schema_id, num_rows FROM %s WHERE used=true", getSegmentsTable())) +.setFetchSize(connector.getStreamingFetchSize()) +.map( +new ResultSetMapper() +{ + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException + { +try { + DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + segmentStats.put( + segment.getId(), + new SegmentSchemaCache.SegmentStats( + (Long) r.getObject("schema_id"), + (Long) r.getObject("num_rows") + ) + ); + return replaceWithExistingSegmentIfPresent(segment); +} +catch (IOException e) { + log.makeAlert(e, "Failed to read segment from db.").emit(); + // If one entry in database is corrupted doPoll() should continue to work overall. See + // filter by `Objects::nonNull` below in this method. + return null; +} + } +} +) +.list(); + } +} +); + +Map schemaMap = new HashMap<>(); + +String schemaPollQuery; +if (latestSegmentSchemaPoll == null) { + schemaPollQuery = StringUtils.format("SELECT id, payload, created_date FROM %s", getSegmentSchemaTable()); +} else { + schemaPollQuery = StringUtils.format( + "SELECT id, payload, created_date FROM %1$s where created_date > '%2$s'", Review Comment: It should be `greater`, since the timestamp used is the max creation time of the record fetched in the previous poll cycle. `greater than equal to` comparison is going to fetch one extra record in each poll cycle (except for the first one). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Associate pending segments with the tasks that requested them (druid)
AmatyaAvadhanula commented on code in PR #16144: URL: https://github.com/apache/druid/pull/16144#discussion_r1529815278 ## indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java: ## @@ -213,6 +218,13 @@ public int compare(Pair left, Pair right) activeTasks.remove(task.getId()); } } + activePendingTaskGroupToTaskIds.clear(); + for (Task task : storedActiveTasks) { +if (activeTasks.contains(task.getId()) && task.getPendingSegmentGroup() != null) { Review Comment: It can be null for pending segments that were created before these changes. Same goes for parent_id -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] SortMerge join support for IS NOT DISTINCT FROM. (druid)
gianm commented on code in PR #16003: URL: https://github.com/apache/druid/pull/16003#discussion_r1529811551 ## sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java: ## @@ -3826,13 +3826,16 @@ public void testJoinWithExplicitIsNotDistinctFromCondition(Map q .context(queryContext) .build() ), -ImmutableList.of( -new Object[]{"", ""}, -new Object[]{"10.1", "10.1"}, -new Object[]{"2", "2"}, -new Object[]{"1", "1"}, -new Object[]{"def", "def"}, -new Object[]{"abc", "abc"} +sortIfSortBased( +ImmutableList.of( +new Object[]{"", ""}, +new Object[]{"10.1", "10.1"}, +new Object[]{"2", "2"}, +new Object[]{"1", "1"}, +new Object[]{"def", "def"}, +new Object[]{"abc", "abc"} +), +0 ) ); } Review Comment: Good news, `testJoinWithExplicitIsNotDistinctFromCondition` is that test case This test was passing on `master` because the join was getting switched quietly to broadcast. Now it actually runs as sort-merge, which is why I had to add `sortIfSortBased` (the results are now in a different order). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] SortMerge join support for IS NOT DISTINCT FROM. (druid)
gianm commented on code in PR #16003: URL: https://github.com/apache/druid/pull/16003#discussion_r1529809764 ## processing/src/main/java/org/apache/druid/frame/key/FrameComparisonWidgetImpl.java: ## @@ -138,21 +139,29 @@ public RowKey readKey(int row) } @Override - public boolean isCompletelyNonNullKey(int row) + public boolean hasNonNullKeyParts(int row, int[] keyParts) Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Upgrade pending segments transactionally (druid)
AmatyaAvadhanula closed pull request #15992: Upgrade pending segments transactionally URL: https://github.com/apache/druid/pull/15992 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Upgrade pending segments transactionally (druid)
AmatyaAvadhanula commented on PR #15992: URL: https://github.com/apache/druid/pull/15992#issuecomment-2005941990 This is being done as part of https://github.com/apache/druid/pull/16144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org
Re: [PR] Simplify concurrent streaming ingestion with replace (druid)
AmatyaAvadhanula closed pull request #15844: Simplify concurrent streaming ingestion with replace URL: https://github.com/apache/druid/pull/15844 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org