This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2b09676 One to many records from record reader/decoder (#5430) 2b09676 is described below commit 2b09676ace59f737409348c184900d074e5e4051 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Tue Jun 9 13:01:32 2020 -0700 One to many records from record reader/decoder (#5430) Handles the case where a record reader generates multiple records in an invocation of next(). Note: None of our record readers have the ability to generate multiple records yet. This change assumes a custom record reader has been plugged in, which is generating multiple records --- .../realtime/LLRealtimeSegmentDataManager.java | 37 +++++-- .../RecordReaderSegmentCreationDataSource.java | 22 +++- .../impl/SegmentIndexCreationDriverImpl.java | 38 +++++-- .../SegmentGenerationWithMultipleRecordsKey.java | 111 +++++++++++++++++++++ .../apache/pinot/spi/data/readers/GenericRow.java | 3 + 5 files changed, 189 insertions(+), 22 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 30569df..70baf3e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -469,19 +470,33 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { messagesAndOffsets.getMessageLengthAtIndex(index), reuse); if (decodedRow != null) { try { - GenericRow transformedRow = _recordTransformer.transform(decodedRow); - - if (transformedRow != null) { - realtimeRowsConsumedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); - indexedMessageCount++; + if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { + for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { + GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow); + if (transformedRow != null) { + realtimeRowsConsumedMeter = _serverMetrics + .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); + indexedMessageCount++; + canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); + } else { + realtimeRowsDroppedMeter = _serverMetrics + .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, + realtimeRowsDroppedMeter); + } + } } else { - realtimeRowsDroppedMeter = _serverMetrics - .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, - realtimeRowsDroppedMeter); + GenericRow transformedRow = _recordTransformer.transform(decodedRow); + if (transformedRow != null) { + realtimeRowsConsumedMeter = _serverMetrics + .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); + indexedMessageCount++; + canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); + } else { + realtimeRowsDroppedMeter = _serverMetrics + .addMeteredTableValue(_metricKeyName, ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1, + realtimeRowsDroppedMeter); + } } - - canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata); } catch (Exception e) { segmentLogger.error("Caught exception while transforming the record: {}", decodedRow, e); _numRowsErrored++; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java index 6722524..d84f1b9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java @@ -18,7 +18,12 @@ */ package org.apache.pinot.core.segment.creator; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.apache.pinot.common.Utils; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; @@ -55,9 +60,20 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat GenericRow reuse = new GenericRow(); while (_recordReader.hasNext()) { reuse.clear(); - GenericRow transformedRow = recordTransformer.transform(_recordReader.next(reuse)); - if (transformedRow != null) { - collector.collectRow(transformedRow); + + reuse = _recordReader.next(reuse); + if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { + for (Object singleRow : (Collection) reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { + GenericRow transformedRow = recordTransformer.transform((GenericRow) singleRow); + if (transformedRow != null) { + collector.collectRow(transformedRow); + } + } + } else { + GenericRow transformedRow = recordTransformer.transform(reuse); + if (transformedRow != null) { + collector.collectRow(transformedRow); + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index b969075..124113e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -24,12 +24,15 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.core.data.readers.PinotSegmentRecordReader; import org.apache.pinot.core.data.recordtransformer.CompositeTransformer; import org.apache.pinot.core.data.recordtransformer.RecordTransformer; @@ -177,15 +180,34 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive LOGGER.info("Start building IndexCreator!"); GenericRow reuse = new GenericRow(); while (recordReader.hasNext()) { - long start = System.currentTimeMillis(); + long recordReadStartTime = System.currentTimeMillis(); + long recordReadStopTime; + long indexStopTime; reuse.clear(); - GenericRow transformedRow = _recordTransformer.transform(recordReader.next(reuse)); - long stop = System.currentTimeMillis(); - totalRecordReadTime += (stop - start); - if (transformedRow != null) { - indexCreator.indexRow(transformedRow); - long stop1 = System.currentTimeMillis(); - totalIndexTime += (stop1 - stop); + GenericRow decodedRow = recordReader.next(reuse); + if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) { + recordReadStopTime = System.currentTimeMillis(); + totalRecordReadTime += (recordReadStopTime - recordReadStartTime); + for (Object singleRow : (Collection) decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) { + recordReadStartTime = System.currentTimeMillis(); + GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow); + recordReadStopTime = System.currentTimeMillis(); + totalRecordReadTime += (recordReadStopTime - recordReadStartTime); + if (transformedRow != null) { + indexCreator.indexRow(transformedRow); + indexStopTime = System.currentTimeMillis(); + totalIndexTime += (indexStopTime - recordReadStopTime); + } + } + } else { + GenericRow transformedRow = _recordTransformer.transform(decodedRow); + recordReadStopTime = System.currentTimeMillis(); + totalRecordReadTime += (recordReadStopTime - recordReadStartTime); + if (transformedRow != null) { + indexCreator.indexRow(transformedRow); + indexStopTime = System.currentTimeMillis(); + totalIndexTime += (indexStopTime - recordReadStopTime); + } } } } catch (Exception e) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java new file mode 100644 index 0000000..9e6450a --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.creator; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.data.readers.GenericRowRecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.core.segment.store.SegmentDirectory; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class SegmentGenerationWithMultipleRecordsKey { + private static final String SUB_COLUMN_1 = "sub1"; + private static final String SUB_COLUMN_2 = "sub2"; + private static final String SEGMENT_DIR_NAME = + System.getProperty("java.io.tmpdir") + File.separator + "segmentMultipleRecordsTest"; + private static final String SEGMENT_NAME = "testSegment"; + + private Schema _schema; + private TableConfig _tableConfig; + + @BeforeClass + public void setup() { + _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); + _schema = new Schema.SchemaBuilder().addSingleValueDimension(SUB_COLUMN_1, FieldSpec.DataType.STRING) + .addMetric(SUB_COLUMN_2, FieldSpec.DataType.LONG).build(); + } + + @BeforeMethod + public void reset() { + FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME)); + } + + @Test + public void testNumDocs() + throws Exception { + File segmentDir = buildSegment(_tableConfig, _schema); + SegmentMetadataImpl metadata = SegmentDirectory.loadSegmentMetadata(segmentDir); + Assert.assertEquals(metadata.getTotalDocs(), 6); + Assert.assertTrue(metadata.getAllColumns().containsAll(Sets.newHashSet(SUB_COLUMN_1, SUB_COLUMN_2))); + } + + private File buildSegment(final TableConfig tableConfig, final Schema schema) + throws Exception { + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(SEGMENT_DIR_NAME); + config.setSegmentName(SEGMENT_NAME); + + List<GenericRow> rows = new ArrayList<>(3); + + GenericRow genericRow1 = new GenericRow(); + genericRow1.putValue(GenericRow.MULTIPLE_RECORDS_KEY, + Lists.newArrayList(getRandomArrayElement(), getRandomArrayElement(), getRandomArrayElement())); + rows.add(genericRow1); + GenericRow genericRow2 = new GenericRow(); + genericRow2.putValue(GenericRow.MULTIPLE_RECORDS_KEY, Lists.newArrayList(getRandomArrayElement())); + rows.add(genericRow2); + GenericRow genericRow3 = new GenericRow(); + genericRow3.putValue(GenericRow.MULTIPLE_RECORDS_KEY, + Lists.newArrayList(getRandomArrayElement(), getRandomArrayElement())); + rows.add(genericRow3); + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + driver.getOutputDirectory().deleteOnExit(); + return driver.getOutputDirectory(); + } + + private GenericRow getRandomArrayElement() { + GenericRow element = new GenericRow(); + element.putValue(SUB_COLUMN_1, RandomStringUtils.randomAlphabetic(4)); + element.putValue(SUB_COLUMN_2, RandomUtils.nextLong()); + return element; + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java index 661053a..580a1ab 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java @@ -48,6 +48,9 @@ import org.apache.pinot.spi.utils.JsonUtils; * We should not be using Boolean, Byte, Character and Short to keep it simple */ public class GenericRow { + + public static final String MULTIPLE_RECORDS_KEY = "$MULTIPLE_RECORDS_KEY$"; + private final Map<String, Object> _fieldToValueMap = new HashMap<>(); private final Set<String> _nullValueFields = new HashSet<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org