Re: [PR] segment format v10 (druid)
cryptoe merged PR #18880: URL: https://github.com/apache/druid/pull/18880 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
github-advanced-security[bot] commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2672093253
##
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a
{@link FileSmoosher} underneath to build
+ * V9 smoosh files and collect the metadata about the offsets in those
containers, and then appends them into the V10
+ * consolidated segment file after the header and {@link SegmentFileMetadata}
is written.
+ *
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json
| container 0 | ... | container n |
+ */
+public class SegmentFileBuilderV10 implements SegmentFileBuilder
+{
+ public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File
baseDir)
+ {
+return create(jsonMapper, baseDir, CompressionStrategy.NONE);
+ }
+
+ public static SegmentFileBuilderV10 create(ObjectMapper jsonMapper, File
baseDir, CompressionStrategy metaCompression)
+ {
+return new SegmentFileBuilderV10(
+jsonMapper,
+IndexIO.V10_FILE_NAME,
+baseDir,
+Integer.MAX_VALUE,
+metaCompression
+);
+ }
+
+ private final ObjectMapper jsonMapper;
+ private final String outputFileName;
+ private final File baseDir;
+ private final long maxChunkSize;
+ private final CompressionStrategy metadataCompression;
+ private final FileSmoosher smoosher;
+ private final Map externalSegmentFileBuilders;
+ private final Map columns = new TreeMap<>();
+
+ @Nullable
+ private String interval = null;
+ @Nullable
+ private BitmapSerdeFactory bitmapEncoding = null;
+ @Nullable
+ private List projections = null;
+
+ private SegmentFileBuilderV10(
+ ObjectMapper jsonMapper,
+ String outputFileName,
+ File baseDir,
+ long maxChunkSize,
+ CompressionStrategy metadataCompression
+ )
+ {
+this.jsonMapper = jsonMapper;
+this.outputFileName = outputFileName;
+this.baseDir = baseDir;
+this.maxChunkSize = maxChunkSize;
+this.metadataCompression = metadataCompression;
+this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize),
outputFileName);
+this.externalSegmentFileBuilders = new TreeMap<>();
+ }
+
+ @Override
+ public void add(String name, File fileToAdd) throws IOException
+ {
+smoosher.add(name, fileToAdd);
+ }
+
+ @Override
+ public void add(String name, ByteBuffer bufferToAdd) throws IOException
+ {
+smoosher.add(name, bufferToAdd);
+ }
+
+ @Override
+ public SegmentFileChannel addWithChannel(String name, long size) throws
IOException
+ {
+return smoosher.addWithChannel(name, size);
+ }
+
+ @Override
+ public SegmentFileBuilder getExternalBuilder(String externalFile)
+ {
+return externalSegmentFileBuilders.computeIfAbsent(
+externalFile,
+(k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxChunkSize, metadataCompression)
+);
+ }
+
+ @Override
+ public void addColumn(String name, ColumnDescriptor columnDescriptor)
+ {
+this.columns.put(name, columnDescriptor);
+
Re: [PR] segment format v10 (druid)
clintropolis commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2672009325
##
processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnFormat;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.file.SegmentFileBuilder;
+import org.apache.druid.segment.file.SegmentFileBuilderV10;
+import org.apache.druid.segment.file.SegmentFileChannel;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.serde.NullColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link IndexMerger} for creating v10 format segments with {@link
SegmentFileBuilderV10}
+ *
+ * @see SegmentFileBuilderV10
+ * @see org.apache.druid.segment.file.SegmentFileMapperV10 to read the
resulting segment file
+ */
+public class IndexMergerV10 extends IndexMergerBase
+{
+ private static final Logger log = new Logger(IndexMergerV10.class);
+
+ public IndexMergerV10(
+ ObjectMapper mapper,
+ IndexIO indexIO,
+ SegmentWriteOutMediumFactory defaultSegmentWriteOutMediumFactory
+ )
+ {
+super(mapper, indexIO, defaultSegmentWriteOutMediumFactory);
+ }
+
+ @Override
+ protected boolean shouldStoreEmptyColumns()
+ {
+return true;
+ }
+
+ @Override
+ protected File makeIndexFiles(
+ final List adapters,
+ final @Nullable Metadata segmentMetadata,
+ final File outDir,
+ final ProgressIndicator progress,
+ final List mergedDimensionsWithTime, // has both explicit and
implicit dimensions, as well as __time
+ final DimensionsSpecInspector dimensionsSpecInspector,
+ final List mergedMetrics,
+ final Function, TimeAndDimsIterator>
rowMergerFn,
+ final IndexSpec indexSpec,
+ final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+
+progress.start();
+progress.progress();
+
+// Merged dimensions without __time.
+List mergedDimensions =
+mergedDimensionsWithTime.stream()
+.filter(dim ->
!ColumnHolder.TIME_COLUMN_NAME.equals(dim))
+.collect(Collectors.toList());
+Closer closer = Closer.create();
+try {
+ final SegmentFileBuilderV10 v10Smoosher = new SegmentFileBuilderV10(
+ mapper,
+ IndexIO.V10_FILE_NAME,
+ outDir,
+ Integer.MAX_VALUE
+ );
+
+ DateTime minTime = DateTimes.MAX;
+ DateTime maxTime = DateTimes.MIN;
+
+ for (IndexableAdapter index : adapters) {
+minTime = JodaUtils.minDateTime(minTime,
index.getDataInterval().getStart());
+maxTime = JodaUtils.maxDateTime(maxTime,
index.getDataInterval().getEnd());
+ }
+ final Interval dataInterval = new Interval(minTime, maxTi
Re: [PR] segment format v10 (druid)
clintropolis commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2672008539
##
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a
{@link FileSmoosher} underneath to build
+ * V9 smoosh files and collect the metadata about the offsets in those
containers, and then appends them into the V10
+ * consolidated segment file after the header and {@link SegmentFileMetadata}
is written.
+ *
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json
| container 0 | ... | container n |
+ */
+public class SegmentFileBuilderV10 implements SegmentFileBuilder
+{
+ private final ObjectMapper jsonMapper;
+ private final String outputFileName;
+ private final File baseDir;
+ private final long maxChunkSize;
+ private final FileSmoosher smoosher;
+ private final Map externalSegmentFileBuilders;
+ private final Map columns = new TreeMap<>();
+
+ @Nullable
+ private String interval = null;
+ @Nullable
+ private BitmapSerdeFactory bitmapEncoding = null;
+ @Nullable
+ private List projections = null;
+
+ public SegmentFileBuilderV10(
+ ObjectMapper jsonMapper,
+ String outputFileName,
+ File baseDir,
+ long maxChunkSize
+ )
+ {
+this.jsonMapper = jsonMapper;
+this.outputFileName = outputFileName;
+this.baseDir = baseDir;
+this.maxChunkSize = maxChunkSize;
+this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize),
outputFileName);
+this.externalSegmentFileBuilders = new TreeMap<>();
+ }
+
+ @Override
+ public void add(String name, File fileToAdd) throws IOException
+ {
+smoosher.add(name, fileToAdd);
+ }
+
+ @Override
+ public void add(String name, ByteBuffer bufferToAdd) throws IOException
+ {
+smoosher.add(name, bufferToAdd);
+ }
+
+ @Override
+ public SegmentFileChannel addWithChannel(String name, long size) throws
IOException
+ {
+return smoosher.addWithChannel(name, size);
+ }
+
+ @Override
+ public SegmentFileBuilder getExternalBuilder(String externalFile)
+ {
+return externalSegmentFileBuilders.computeIfAbsent(
+externalFile,
+(k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxChunkSize)
+);
+ }
+
+ @Override
+ public void addColumn(String name, ColumnDescriptor columnDescriptor)
+ {
+this.columns.put(name, columnDescriptor);
+ }
+
+ public void addInterval(String interval)
+ {
+this.interval = interval;
+ }
+
+ public void addBitmapEncoding(BitmapSerdeFactory bitmapEncoding)
+ {
+this.bitmapEncoding = bitmapEncoding;
+ }
+
+ public void addProjections(List projections)
+ {
+this.projections = projections;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+for (SegmentFileBuilderV10 externalBuilder :
externalSegmentFileBuilders.values()) {
+ externalBuilder.close();
+}
+
+smoosher.close();
+
+SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+smoosher.getContainers(),
+smoosher.getInternalFiles(),
+interval,
+columns.isEmpty() ? null : columns,
+projections,
+bitmapEncoding
+);
+
+final byte[] metadataBytes =
jsonMapper.writeV
Re: [PR] segment format v10 (druid)
clintropolis commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2672003739
##
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a
{@link FileSmoosher} underneath to build
+ * V9 smoosh files and collect the metadata about the offsets in those
containers, and then appends them into the V10
+ * consolidated segment file after the header and {@link SegmentFileMetadata}
is written.
+ *
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json
| container 0 | ... | container n |
+ */
+public class SegmentFileBuilderV10 implements SegmentFileBuilder
+{
+ private final ObjectMapper jsonMapper;
+ private final String outputFileName;
+ private final File baseDir;
+ private final long maxChunkSize;
+ private final FileSmoosher smoosher;
+ private final Map externalSegmentFileBuilders;
+ private final Map columns = new TreeMap<>();
+
+ @Nullable
+ private String interval = null;
+ @Nullable
+ private BitmapSerdeFactory bitmapEncoding = null;
+ @Nullable
+ private List projections = null;
+
+ public SegmentFileBuilderV10(
+ ObjectMapper jsonMapper,
+ String outputFileName,
+ File baseDir,
+ long maxChunkSize
+ )
+ {
+this.jsonMapper = jsonMapper;
+this.outputFileName = outputFileName;
+this.baseDir = baseDir;
+this.maxChunkSize = maxChunkSize;
+this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize),
outputFileName);
+this.externalSegmentFileBuilders = new TreeMap<>();
+ }
+
+ @Override
+ public void add(String name, File fileToAdd) throws IOException
+ {
+smoosher.add(name, fileToAdd);
+ }
+
+ @Override
+ public void add(String name, ByteBuffer bufferToAdd) throws IOException
+ {
+smoosher.add(name, bufferToAdd);
+ }
+
+ @Override
+ public SegmentFileChannel addWithChannel(String name, long size) throws
IOException
+ {
+return smoosher.addWithChannel(name, size);
+ }
+
+ @Override
+ public SegmentFileBuilder getExternalBuilder(String externalFile)
+ {
+return externalSegmentFileBuilders.computeIfAbsent(
+externalFile,
+(k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxChunkSize)
+);
+ }
+
+ @Override
+ public void addColumn(String name, ColumnDescriptor columnDescriptor)
+ {
+this.columns.put(name, columnDescriptor);
+ }
+
+ public void addInterval(String interval)
+ {
+this.interval = interval;
+ }
+
+ public void addBitmapEncoding(BitmapSerdeFactory bitmapEncoding)
+ {
+this.bitmapEncoding = bitmapEncoding;
+ }
+
+ public void addProjections(List projections)
+ {
+this.projections = projections;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+for (SegmentFileBuilderV10 externalBuilder :
externalSegmentFileBuilders.values()) {
+ externalBuilder.close();
+}
+
+smoosher.close();
+
+SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+smoosher.getContainers(),
+smoosher.getInternalFiles(),
+interval,
+columns.isEmpty() ? null : columns,
+projections,
+bitmapEncoding
+);
+
+final byte[] metadataBytes =
jsonMapper.writeV
Re: [PR] segment format v10 (druid)
clintropolis commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2672000210
##
processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.ColumnFormat;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.file.SegmentFileBuilder;
+import org.apache.druid.segment.file.SegmentFileBuilderV10;
+import org.apache.druid.segment.file.SegmentFileChannel;
+import org.apache.druid.segment.loading.SegmentizerFactory;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+import org.apache.druid.segment.projections.Projections;
+import org.apache.druid.segment.serde.NullColumnPartSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * {@link IndexMerger} for creating v10 format segments with {@link
SegmentFileBuilderV10}
+ *
+ * @see SegmentFileBuilderV10
+ * @see org.apache.druid.segment.file.SegmentFileMapperV10 to read the
resulting segment file
+ */
+public class IndexMergerV10 extends IndexMergerBase
+{
+ private static final Logger log = new Logger(IndexMergerV10.class);
+
+ public IndexMergerV10(
+ ObjectMapper mapper,
+ IndexIO indexIO,
+ SegmentWriteOutMediumFactory defaultSegmentWriteOutMediumFactory
+ )
+ {
+super(mapper, indexIO, defaultSegmentWriteOutMediumFactory);
+ }
+
+ @Override
+ protected boolean shouldStoreEmptyColumns()
+ {
+return true;
+ }
+
+ @Override
+ protected File makeIndexFiles(
+ final List adapters,
+ final @Nullable Metadata segmentMetadata,
+ final File outDir,
+ final ProgressIndicator progress,
+ final List mergedDimensionsWithTime, // has both explicit and
implicit dimensions, as well as __time
+ final DimensionsSpecInspector dimensionsSpecInspector,
+ final List mergedMetrics,
+ final Function, TimeAndDimsIterator>
rowMergerFn,
+ final IndexSpec indexSpec,
+ final @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+final String basePrefix = Projections.BASE_TABLE_PROJECTION_NAME + "/";
+
+progress.start();
+progress.progress();
+
+// Merged dimensions without __time.
+List mergedDimensions =
+mergedDimensionsWithTime.stream()
+.filter(dim ->
!ColumnHolder.TIME_COLUMN_NAME.equals(dim))
+.collect(Collectors.toList());
+Closer closer = Closer.create();
+try {
+ final SegmentFileBuilderV10 v10Smoosher = new SegmentFileBuilderV10(
+ mapper,
+ IndexIO.V10_FILE_NAME,
+ outDir,
+ Integer.MAX_VALUE
+ );
+
+ DateTime minTime = DateTimes.MAX;
+ DateTime maxTime = DateTimes.MIN;
+
+ for (IndexableAdapter index : adapters) {
+minTime = JodaUtils.minDateTime(minTime,
index.getDataInterval().getStart());
+maxTime = JodaUtils.maxDateTime(maxTime,
index.getDataInterval().getEnd());
+ }
+ final Interval dataInterval = new Interval(minTime, maxTi
Re: [PR] segment format v10 (druid)
jtuglu1 commented on PR #18880: URL: https://github.com/apache/druid/pull/18880#issuecomment-3721587955 > > Check out https://github.com/lancedb/lance! > > Interesting, but unless I am missing something this looks like it just has its own 'at rest' format inspired by some Arrow and Parquet stuff, see https://lance.org/format/file/ and https://lance.org/format/file/encoding/, and can convert to actual Arrow format for interop stuff. Reading through ^ there is a lot of overlap in how we do things in our format, we just have not formalized/genericized the various 'structural encodings' as they call them and are internal implementation details of column serializer/deserializers, and some differences in how metadata about the contents is stored. Yeah they have their own file format (at rest storage) but they use arrow for all IPC transfer. This makes it super easy to read/write Lance files with other things that speak Arrow (pandas, datafusion, other DB engines, etc.). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
clintropolis commented on PR #18880: URL: https://github.com/apache/druid/pull/18880#issuecomment-3721582374 > Check out https://github.com/lancedb/lance! Interesting, but unless I am missing something this looks like it just has its own 'at rest' format inspired by some Arrow and Parquet stuff, see https://lance.org/format/file/ and https://lance.org/format/file/encoding/, and can convert to actual Arrow format for interop stuff. Reading through ^ there is a lot of overlap in how we do things in our format, we just have not formalized/genericized the various 'structural encodings' as they call them and are internal implementation details of column serializer/deserializers, and some differences in how metadata about the contents is stored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
jtuglu1 commented on PR #18880: URL: https://github.com/apache/druid/pull/18880#issuecomment-3721354011 > > 👍 A few questions: > > ``` > > * Will this support reading a single column from a segment (instead of needing to download + scan entire segment)? I guess mapping offset ranges of a segment file is analogous to the row-group concept in Parquet. > > ``` > > Definitely supporting partial downloads at the level of columns and/or projections is a goal of this format, and something it would enable doing. > > > ``` > > * Are there any thoughts to make Druid formats Arrow-compatible? This would open up many more integrations with existing big data ecosystem externally, as well as making intra-cluster data transfer potentially much faster (send everything as RecordBatch). > > ``` > > For intra-cluster data transfer, the MSQ query paths (which to me are the ones I want to focus on 😄) are using Frames, which are similar to Arrow in efficiency. For integrating with the big data ecosystem in ways that require actually using Arrow, there is a question about whether we're doing something for data in flight (RPC) or for data at rest (in object storage). For RPC I think an API that returns Arrow streams can make sense in theory. It wouldn't be related to the segment format, it would be more related to the query side. For data at rest, I don't know how much sense that makes. I haven't heard much of people using Arrow for data at rest. Check out https://github.com/lancedb/lance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
gianm commented on PR #18880: URL: https://github.com/apache/druid/pull/18880#issuecomment-3721345028 > 👍 A few questions: > > * Will this support reading a single column from a segment (instead of needing to download + scan entire segment)? I guess mapping offset ranges of a segment file is analogous to the row-group concept in Parquet. Definitely supporting partial downloads at the level of columns and/or projections is a goal of this format, and something it would enable doing. > * Are there any thoughts to make Druid formats Arrow-compatible? This would open up many more integrations with existing big data ecosystem externally, as well as making intra-cluster data transfer potentially much faster (send everything as RecordBatch). For intra-cluster data transfer, the MSQ query paths (which to me are the ones I want to focus on 😄) are using Frames, which are similar to Arrow in efficiency. For integrating with the big data ecosystem in ways that require actually using Arrow, there is a question about whether we're doing something for data in flight (RPC) or for data at rest (in object storage). For RPC I think an API that returns Arrow streams can make sense in theory. It wouldn't be related to the segment format, it would be more related to the query side. For data at rest, I don't know how much sense that makes. I haven't heard much of people using Arrow for data at rest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
gianm commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2670429263
##
processing/src/main/java/org/apache/druid/segment/file/SegmentFileBuilderV10.java:
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.data.BitmapSerdeFactory;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.projections.ProjectionMetadata;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * {@link SegmentFileBuilder} for V10 format segments. Right now, this uses a
{@link FileSmoosher} underneath to build
+ * V9 smoosh files and collect the metadata about the offsets in those
containers, and then appends them into the V10
+ * consolidated segment file after the header and {@link SegmentFileMetadata}
is written.
+ *
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json
| container 0 | ... | container n |
+ */
+public class SegmentFileBuilderV10 implements SegmentFileBuilder
+{
+ private final ObjectMapper jsonMapper;
+ private final String outputFileName;
+ private final File baseDir;
+ private final long maxChunkSize;
+ private final FileSmoosher smoosher;
+ private final Map externalSegmentFileBuilders;
+ private final Map columns = new TreeMap<>();
+
+ @Nullable
+ private String interval = null;
+ @Nullable
+ private BitmapSerdeFactory bitmapEncoding = null;
+ @Nullable
+ private List projections = null;
+
+ public SegmentFileBuilderV10(
+ ObjectMapper jsonMapper,
+ String outputFileName,
+ File baseDir,
+ long maxChunkSize
+ )
+ {
+this.jsonMapper = jsonMapper;
+this.outputFileName = outputFileName;
+this.baseDir = baseDir;
+this.maxChunkSize = maxChunkSize;
+this.smoosher = new FileSmoosher(baseDir, Ints.checkedCast(maxChunkSize),
outputFileName);
+this.externalSegmentFileBuilders = new TreeMap<>();
+ }
+
+ @Override
+ public void add(String name, File fileToAdd) throws IOException
+ {
+smoosher.add(name, fileToAdd);
+ }
+
+ @Override
+ public void add(String name, ByteBuffer bufferToAdd) throws IOException
+ {
+smoosher.add(name, bufferToAdd);
+ }
+
+ @Override
+ public SegmentFileChannel addWithChannel(String name, long size) throws
IOException
+ {
+return smoosher.addWithChannel(name, size);
+ }
+
+ @Override
+ public SegmentFileBuilder getExternalBuilder(String externalFile)
+ {
+return externalSegmentFileBuilders.computeIfAbsent(
+externalFile,
+(k) -> new SegmentFileBuilderV10(jsonMapper, externalFile, baseDir,
maxChunkSize)
+);
+ }
+
+ @Override
+ public void addColumn(String name, ColumnDescriptor columnDescriptor)
+ {
+this.columns.put(name, columnDescriptor);
+ }
+
+ public void addInterval(String interval)
+ {
+this.interval = interval;
+ }
+
+ public void addBitmapEncoding(BitmapSerdeFactory bitmapEncoding)
+ {
+this.bitmapEncoding = bitmapEncoding;
+ }
+
+ public void addProjections(List projections)
+ {
+this.projections = projections;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+for (SegmentFileBuilderV10 externalBuilder :
externalSegmentFileBuilders.values()) {
+ externalBuilder.close();
+}
+
+smoosher.close();
+
+SegmentFileMetadata segmentFileMetadata = new SegmentFileMetadata(
+smoosher.getContainers(),
+smoosher.getInternalFiles(),
+interval,
+columns.isEmpty() ? null : columns,
+projections,
+bitmapEncoding
+);
+
+final byte[] metadataBytes =
jsonMapper.writeValueAsB
Re: [PR] segment format v10 (druid)
github-advanced-security[bot] commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2663606645
##
processing/src/main/java/org/apache/druid/segment/projections/AggregateProjectionSchema.java:
##
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.projections;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.query.OrderBy;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class AggregateProjectionSchema implements ProjectionSchema
+{
+ /**
+ * It is not likely the best way to find the best matching projections, but
it is the one we have for now. This
+ * comparator is used to sort all the projections in a segment "best" first,
where best is defined as fewest grouping
+ * columns, most virtual columns and aggregators, as an approximation of
likely to have the fewest number of rows to
+ * scan.
+ */
+ public static final Comparator COMPARATOR = (o1,
o2) -> {
+// coarsest granularity first
+if
(o1.getEffectiveGranularity().isFinerThan(o2.getEffectiveGranularity())) {
+ return 1;
+}
+if
(o2.getEffectiveGranularity().isFinerThan(o1.getEffectiveGranularity())) {
+ return -1;
+}
+// fewer dimensions first
+final int dimsCompare = Integer.compare(
+o1.groupingColumns.size(),
+o2.groupingColumns.size()
+);
+if (dimsCompare != 0) {
+ return dimsCompare;
+}
+// more metrics first
+int metCompare = Integer.compare(o2.aggregators.length,
o1.aggregators.length);
+if (metCompare != 0) {
+ return metCompare;
+}
+// more virtual columns first
+final int virtCompare = Integer.compare(
+o2.virtualColumns.getVirtualColumns().length,
+o1.virtualColumns.getVirtualColumns().length
+);
+if (virtCompare != 0) {
+ return virtCompare;
+}
+return o1.name.compareTo(o2.name);
+ };
+
+ private final String name;
+ @Nullable
+ private final String timeColumnName;
+ @Nullable
+ private final DimFilter filter;
+ private final VirtualColumns virtualColumns;
+ private final List groupingColumns;
+ private final AggregatorFactory[] aggregators;
+ private final List ordering;
+ private final List orderingWithTimeSubstitution;
+
+ // computed fields
+ private final int timeColumnPosition;
+ private final Granularity effectiveGranularity;
+
+ @JsonCreator
+ public AggregateProjectionSchema(
+ @JsonProperty("name") String name,
+ @JsonProperty("timeColumnName") @Nullable String timeColumnName,
+ @JsonProperty("filter") @Nullable DimFilter filter,
+ @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
+ @JsonProperty("groupingColumns") @Nullable List groupingColumns,
+ @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
+ @JsonProperty("ordering") List ordering
+ )
+ {
+if (name == null || name.isEmpty()) {
+ throw DruidException.defensive("projection schema name cannot be null or
empty");
+}
+this.name = name;
+if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null
|| aggregators.length == 0)) {
+
Re: [PR] segment format v10 (druid)
jtuglu1 commented on PR #18880: URL: https://github.com/apache/druid/pull/18880#issuecomment-3704016683 👍 A few questions: - Will this support reading a single column from a segment (instead of needing to download + scan entire segment)? - Are there any thoughts to make Druid formats Arrow-compatible? This would open up many more integrations with existing ecosystem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] segment format v10 (druid)
github-advanced-security[bot] commented on code in PR #18880:
URL: https://github.com/apache/druid/pull/18880#discussion_r2655981947
##
processing/src/main/java/org/apache/druid/segment/file/SegmentFileMapperV10.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@link SegmentFileMapper} implementation for V10 segment files.
+ *
+ * V10 file format:
+ * | version (byte) | meta compression (byte) | meta length (int) | meta json
| chunk 0 | chunk 1 | ... | chunk n |
+ */
+public class SegmentFileMapperV10 implements SegmentFileMapper
+{
+ /**
+ * Create a v10 {@link SegmentFileMapper} with 'external' attached v10
segment files
+ *
+ * @param segmentFile v10 segment file with name {@link
IndexIO#V10_FILE_NAME}
+ * @param mapper json mapper to deserialize metadata
+ * @param externals list of 'external' v10 segment files to attach to this
mapper and files that can be referenced
+ *using {@link #mapExternalFile(String, String)}
+ * @return v10 {@link SegmentFileMapper} using memory mapped {@link
ByteBuffer}
+ * @throws IOException
+ */
+ public static SegmentFileMapperV10 create(
+ File segmentFile,
+ ObjectMapper mapper,
+ List externals
+ ) throws IOException
+ {
+final SegmentFileMapperV10 entryPoint = create(segmentFile, mapper);
+
+final Map externalMappers = new HashMap<>();
+try {
+ for (String filename : externals) {
+final File externalFile = new File(segmentFile.getParentFile(),
filename);
+if (externalFile.exists()) {
+ externalMappers.put(filename, create(externalFile, mapper));
+}
+ }
+}
+catch (Throwable t) {
+ Closer closer = Closer.create();
+ closer.registerAll(externalMappers.values());
+ throw CloseableUtils.closeAndWrapInCatch(t, closer);
+}
+
+return new SegmentFileMapperV10(
+entryPoint.segmentFile,
+entryPoint.segmentFileMetadata,
+entryPoint.containers,
+externalMappers
+);
+ }
+
+ /**
+ * Create a v10 {@link SegmentFileMapper}
+ */
+ public static SegmentFileMapperV10 create(
+ File segmentFile,
+ ObjectMapper mapper
+ ) throws IOException
+ {
+try (FileInputStream fis = new FileInputStream(segmentFile)) {
+ // version (byte) | metadata compression (byte) | metadata length (int)
+ byte[] header = new byte[1 + 1 + Integer.BYTES];
+ int read = fis.read(header);
+ if (read < header.length) {
+throw DruidException.defensive("expected at least [%s] bytes, but only
read [%s]", header.length, read);
+ }
+ ByteBuffer headerBuffer = ByteBuffer.wrap(header);
+ headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ if (headerBuffer.get(0) != IndexIO.V10_VERSION) {
+throw DruidException.defensive("not v10, got[%s] instead",
headerBuffer.get(0));
+ }
+
+ // ideally we should make compression work, right now only uncompressed
is supported (we probably need to add
+ // another int for compressed length if strategy is to be compressed)
+ byte compression = headerBuffer.get(1);
+ CompressionStrategy compressionStrategy =
CompressionStrategy.forId(compression);
+ if (!CompressionStrategy.N
