This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b09676  One to many records from record reader/decoder (#5430)
2b09676 is described below

commit 2b09676ace59f737409348c184900d074e5e4051
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Tue Jun 9 13:01:32 2020 -0700

    One to many records from record reader/decoder (#5430)
    
    Handles the case where a record reader generates multiple records in an 
invocation of next().
    Note: None of our record readers have the ability to generate multiple 
records yet. This change assumes a custom record reader has been plugged in, 
which is generating multiple records
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  37 +++++--
 .../RecordReaderSegmentCreationDataSource.java     |  22 +++-
 .../impl/SegmentIndexCreationDriverImpl.java       |  38 +++++--
 .../SegmentGenerationWithMultipleRecordsKey.java   | 111 +++++++++++++++++++++
 .../apache/pinot/spi/data/readers/GenericRow.java  |   3 +
 5 files changed, 189 insertions(+), 22 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 30569df..70baf3e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -469,19 +470,33 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
       if (decodedRow != null) {
         try {
-          GenericRow transformedRow = _recordTransformer.transform(decodedRow);
-
-          if (transformedRow != null) {
-            realtimeRowsConsumedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
-            indexedMessageCount++;
+          if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+            for (Object singleRow : (Collection) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+              GenericRow transformedRow = 
_recordTransformer.transform((GenericRow) singleRow);
+              if (transformedRow != null) {
+                realtimeRowsConsumedMeter = _serverMetrics
+                    .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+                indexedMessageCount++;
+                canTakeMore = _realtimeSegment.index(transformedRow, 
msgMetadata);
+              } else {
+                realtimeRowsDroppedMeter = _serverMetrics
+                    .addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                        realtimeRowsDroppedMeter);
+              }
+            }
           } else {
-            realtimeRowsDroppedMeter = _serverMetrics
-                .addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
-                    realtimeRowsDroppedMeter);
+            GenericRow transformedRow = 
_recordTransformer.transform(decodedRow);
+            if (transformedRow != null) {
+              realtimeRowsConsumedMeter = _serverMetrics
+                  .addMeteredTableValue(_metricKeyName, 
ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+              indexedMessageCount++;
+              canTakeMore = _realtimeSegment.index(transformedRow, 
msgMetadata);
+            } else {
+              realtimeRowsDroppedMeter = _serverMetrics
+                  .addMeteredTableValue(_metricKeyName, 
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+                      realtimeRowsDroppedMeter);
+            }
           }
-
-          canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
         } catch (Exception e) {
           segmentLogger.error("Caught exception while transforming the record: 
{}", decodedRow, e);
           _numRowsErrored++;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
index 6722524..d84f1b9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
@@ -55,9 +60,20 @@ public class RecordReaderSegmentCreationDataSource 
implements SegmentCreationDat
       GenericRow reuse = new GenericRow();
       while (_recordReader.hasNext()) {
         reuse.clear();
-        GenericRow transformedRow = 
recordTransformer.transform(_recordReader.next(reuse));
-        if (transformedRow != null) {
-          collector.collectRow(transformedRow);
+
+        reuse = _recordReader.next(reuse);
+        if (reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+          for (Object singleRow : (Collection) 
reuse.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            GenericRow transformedRow = 
recordTransformer.transform((GenericRow) singleRow);
+            if (transformedRow != null) {
+              collector.collectRow(transformedRow);
+            }
+          }
+        } else {
+          GenericRow transformedRow = recordTransformer.transform(reuse);
+          if (transformedRow != null) {
+            collector.collectRow(transformedRow);
+          }
         }
       }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index b969075..124113e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -24,12 +24,15 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
@@ -177,15 +180,34 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
       LOGGER.info("Start building IndexCreator!");
       GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
-        long start = System.currentTimeMillis();
+        long recordReadStartTime = System.currentTimeMillis();
+        long recordReadStopTime;
+        long indexStopTime;
         reuse.clear();
-        GenericRow transformedRow = 
_recordTransformer.transform(recordReader.next(reuse));
-        long stop = System.currentTimeMillis();
-        totalRecordReadTime += (stop - start);
-        if (transformedRow != null) {
-          indexCreator.indexRow(transformedRow);
-          long stop1 = System.currentTimeMillis();
-          totalIndexTime += (stop1 - stop);
+        GenericRow decodedRow = recordReader.next(reuse);
+        if (decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY) != null) {
+          recordReadStopTime = System.currentTimeMillis();
+          totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
+          for (Object singleRow : (Collection) 
decodedRow.getValue(GenericRow.MULTIPLE_RECORDS_KEY)) {
+            recordReadStartTime = System.currentTimeMillis();
+            GenericRow transformedRow = 
_recordTransformer.transform((GenericRow) singleRow);
+            recordReadStopTime = System.currentTimeMillis();
+            totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
+            if (transformedRow != null) {
+              indexCreator.indexRow(transformedRow);
+              indexStopTime = System.currentTimeMillis();
+              totalIndexTime += (indexStopTime - recordReadStopTime);
+            }
+          }
+        } else {
+          GenericRow transformedRow = _recordTransformer.transform(decodedRow);
+          recordReadStopTime = System.currentTimeMillis();
+          totalRecordReadTime += (recordReadStopTime - recordReadStartTime);
+          if (transformedRow != null) {
+            indexCreator.indexRow(transformedRow);
+            indexStopTime = System.currentTimeMillis();
+            totalIndexTime += (indexStopTime - recordReadStopTime);
+          }
         }
       }
     } catch (Exception e) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java
new file mode 100644
index 0000000..9e6450a
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithMultipleRecordsKey.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class SegmentGenerationWithMultipleRecordsKey {
+  private static final String SUB_COLUMN_1 = "sub1";
+  private static final String SUB_COLUMN_2 = "sub2";
+  private static final String SEGMENT_DIR_NAME =
+      System.getProperty("java.io.tmpdir") + File.separator + 
"segmentMultipleRecordsTest";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private Schema _schema;
+  private TableConfig _tableConfig;
+
+  @BeforeClass
+  public void setup() {
+    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+    _schema = new Schema.SchemaBuilder().addSingleValueDimension(SUB_COLUMN_1, 
FieldSpec.DataType.STRING)
+        .addMetric(SUB_COLUMN_2, FieldSpec.DataType.LONG).build();
+  }
+
+  @BeforeMethod
+  public void reset() {
+    FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME));
+  }
+
+  @Test
+  public void testNumDocs()
+      throws Exception {
+    File segmentDir = buildSegment(_tableConfig, _schema);
+    SegmentMetadataImpl metadata = 
SegmentDirectory.loadSegmentMetadata(segmentDir);
+    Assert.assertEquals(metadata.getTotalDocs(), 6);
+    
Assert.assertTrue(metadata.getAllColumns().containsAll(Sets.newHashSet(SUB_COLUMN_1,
 SUB_COLUMN_2)));
+  }
+
+  private File buildSegment(final TableConfig tableConfig, final Schema schema)
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(SEGMENT_DIR_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    List<GenericRow> rows = new ArrayList<>(3);
+
+    GenericRow genericRow1 = new GenericRow();
+    genericRow1.putValue(GenericRow.MULTIPLE_RECORDS_KEY,
+        Lists.newArrayList(getRandomArrayElement(), getRandomArrayElement(), 
getRandomArrayElement()));
+    rows.add(genericRow1);
+    GenericRow genericRow2 = new GenericRow();
+    genericRow2.putValue(GenericRow.MULTIPLE_RECORDS_KEY, 
Lists.newArrayList(getRandomArrayElement()));
+    rows.add(genericRow2);
+    GenericRow genericRow3 = new GenericRow();
+    genericRow3.putValue(GenericRow.MULTIPLE_RECORDS_KEY,
+        Lists.newArrayList(getRandomArrayElement(), getRandomArrayElement()));
+    rows.add(genericRow3);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(rows));
+    driver.build();
+    driver.getOutputDirectory().deleteOnExit();
+    return driver.getOutputDirectory();
+  }
+
+  private GenericRow getRandomArrayElement() {
+    GenericRow element = new GenericRow();
+    element.putValue(SUB_COLUMN_1, RandomStringUtils.randomAlphabetic(4));
+    element.putValue(SUB_COLUMN_2, RandomUtils.nextLong());
+    return element;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 661053a..580a1ab 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -48,6 +48,9 @@ import org.apache.pinot.spi.utils.JsonUtils;
  *  We should not be using Boolean, Byte, Character and Short to keep it simple
  */
 public class GenericRow {
+
+  public static final String MULTIPLE_RECORDS_KEY = "$MULTIPLE_RECORDS_KEY$";
+
   private final Map<String, Object> _fieldToValueMap = new HashMap<>();
   private final Set<String> _nullValueFields = new HashSet<>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to