Github user scottcarey commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21070#discussion_r182161734
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
    @@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
         }
       }
     
    +  private ByteBuffer getBuffer(int length) {
    +    try {
    +      return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
    +    } catch (IOException e) {
    +      throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
    +    }
    +  }
    +
       @Override
       public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
    -    c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
    -    offset += 4 * total;
    +    int requiredBytes = total * 4;
    +    ByteBuffer buffer = getBuffer(requiredBytes);
    +
    +    if (buffer.hasArray()) {
    +      int offset = buffer.arrayOffset() + buffer.position();
    +      c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
    +    } else {
    +      for (int i = 0; i < total; i += 1) {
    +        c.putInt(rowId + i, buffer.getInt());
    +      }
    +    }
       }
     
       @Override
       public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
    -    c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
    -    offset += 8 * total;
    +    int requiredBytes = total * 8;
    +    ByteBuffer buffer = getBuffer(requiredBytes);
    +
    +    if (buffer.hasArray()) {
    +      int offset = buffer.arrayOffset() + buffer.position();
    +      c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
    +    } else {
    +      for (int i = 0; i < total; i += 1) {
    +        c.putLong(rowId + i, buffer.getLong());
    +      }
    +    }
       }
     
       @Override
       public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
    -    c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
    -    offset += 4 * total;
    +    int requiredBytes = total * 4;
    +    ByteBuffer buffer = getBuffer(requiredBytes);
    +
    +    if (buffer.hasArray()) {
    +      int offset = buffer.arrayOffset() + buffer.position();
    +      c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
    +    } else {
    +      for (int i = 0; i < total; i += 1) {
    +        c.putFloat(rowId + i, buffer.getFloat());
    +      }
    +    }
       }
     
       @Override
       public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
    -    c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
    -    offset += 8 * total;
    +    int requiredBytes = total * 8;
    +    ByteBuffer buffer = getBuffer(requiredBytes);
    +
    +    if (buffer.hasArray()) {
    +      int offset = buffer.arrayOffset() + buffer.position();
    +      c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
    +    } else {
    +      for (int i = 0; i < total; i += 1) {
    +        c.putDouble(rowId + i, buffer.getDouble());
    +      }
    +    }
    +  }
    +
    +  private byte getByte() {
    +    try {
    +      return (byte) in.read();
    +    } catch (IOException e) {
    +      throw new ParquetDecodingException("Failed to read a byte", e);
    +    }
       }
     
       @Override
       public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
    -    for (int i = 0; i < total; i++) {
    -      // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
    -      // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
    -      c.putByte(rowId + i, Platform.getByte(buffer, offset));
    -      offset += 4;
    +    int requiredBytes = total * 4;
    +    ByteBuffer buffer = getBuffer(requiredBytes);
    +
    +    for (int i = 0; i < total; i += 1) {
    +      c.putByte(rowId + i, buffer.get());
    +      // skip the next 3 bytes
    +      buffer.position(buffer.position() + 3);
         }
       }
     
       @Override
       public final boolean readBoolean() {
    -    byte b = Platform.getByte(buffer, offset);
    -    boolean v = (b & (1 << bitOffset)) != 0;
    +    // TODO: vectorize decoding and keep boolean[] instead of currentByte
    +    if (bitOffset == 0) {
    +      currentByte = getByte();
    +    }
    +
    +    boolean v = (currentByte & (1 << bitOffset)) != 0;
         bitOffset += 1;
         if (bitOffset == 8) {
           bitOffset = 0;
    -      offset++;
         }
         return v;
       }
     
       @Override
       public final int readInteger() {
    -    int v = Platform.getInt(buffer, offset);
    -    if (bigEndianPlatform) {
    -      v = java.lang.Integer.reverseBytes(v);
    -    }
    -    offset += 4;
    -    return v;
    +    return getBuffer(4).getInt();
       }
     
       @Override
       public final long readLong() {
    -    long v = Platform.getLong(buffer, offset);
    -    if (bigEndianPlatform) {
    -      v = java.lang.Long.reverseBytes(v);
    -    }
    -    offset += 8;
    -    return v;
    +    return getBuffer(8).getLong();
       }
     
       @Override
       public final byte readByte() {
    -    return (byte)readInteger();
    +    return (byte) readInteger();
       }
     
       @Override
       public final float readFloat() {
    -    float v;
    -    if (!bigEndianPlatform) {
    -      v = Platform.getFloat(buffer, offset);
    -    } else {
    -      v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
    -    }
    -    offset += 4;
    -    return v;
    +    return getBuffer(4).getFloat();
       }
     
       @Override
       public final double readDouble() {
    -    double v;
    -    if (!bigEndianPlatform) {
    -      v = Platform.getDouble(buffer, offset);
    -    } else {
    -      v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
    -    }
    -    offset += 8;
    -    return v;
    +    return getBuffer(8).getDouble();
    --- End diff --
    
    It would seem to me that if ByteBufferInputStream had a few extra methods 
on it, this would all be easier and potentially faster:
    
    (probably addressed in another issue though):
    
    `getDouble()` `getLong()` `getInt()` `getBuffer(...)` etc.  The 
`ByteBufferInputStream` itself could have an `Order` set and it could 
`clone().order(...)` any buffers passed to it with the wrong order.
    
    In other words, it looks like a lot of what is done here is probably both 
more efficient for ByteBufferInputStream to do, and probably useful to other 
users of that class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to