Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/8747#discussion_r40958537
--- Diff:
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write data into global row buffer using `UnsafeRow`
format,
+ * used by {@link
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
+ */
+public class UnsafeRowWriter {
+
+ private GlobalBufferHolder holder;
+ // The offset of the global buffer where we start to write this row.
+ private int startingOffset;
+ private int nullBitsSize;
+
+ public void initialize(GlobalBufferHolder holder, int numFields) {
+ this.holder = holder;
+ this.startingOffset = holder.cursor;
+ this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
+
+ // grow the global buffer to make sure it has enough space to write
fixed-length data.
+ final int fixedSize = nullBitsSize + 8 * numFields;
+ holder.grow(fixedSize);
+ holder.cursor += fixedSize;
+
+ // zero-out the null bits region
+ for (int i = 0; i < nullBitsSize; i += 8) {
+ Platform.putLong(holder.buffer, startingOffset + i, 0L);
+ }
+ }
+
+ private void zeroOutPaddingBytes(int numBytes) {
+ if ((numBytes & 0x07) > 0) {
+ Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) <<
3), 0L);
+ }
+ }
+
+ public void setNullAt(int ordinal) {
+ BitSetMethods.set(holder.buffer, startingOffset, ordinal);
+ Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
+ }
+
+ public long getFieldOffset(int ordinal) {
+ return startingOffset + nullBitsSize + 8 * ordinal;
+ }
+
+ public void setOffsetAndSize(int ordinal, long size) {
+ setOffsetAndSize(ordinal, holder.cursor, size);
+ }
+
+ public void setOffsetAndSize(int ordinal, long currentCursor, long size)
{
+ final long relativeOffset = currentCursor - startingOffset;
+ final long fieldOffset = getFieldOffset(ordinal);
+ final long offsetAndSize = (relativeOffset << 32) | size;
+
+ Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
+ }
+
+ // Do word alignment for this row and return the number of bytes padded.
+ // todo: remove this after we make unsafe array data word align.
+ public void alignWords(int numBytes) {
+ final int remainder = numBytes & 0x07;
+
+ if (remainder > 0) {
+ final int paddingBytes = 8 - remainder;
+ holder.grow(paddingBytes);
+
+ final byte[] orignalValues = new byte[remainder];
+ Platform.copyMemory(
+ holder.buffer,
+ holder.cursor - remainder,
+ orignalValues,
+ Platform.BYTE_ARRAY_OFFSET,
+ remainder);
+
+ Platform.putLong(holder.buffer, holder.cursor - remainder, 0);
+
+ Platform.copyMemory(
+ orignalValues,
+ Platform.BYTE_ARRAY_OFFSET,
+ holder.buffer,
+ holder.cursor - remainder,
+ remainder);
+
+ holder.cursor += paddingBytes;
+ }
+ }
+
+
+
+ public void writeCompactDecimal(int ordinal, Decimal input, int
precision, int scale) {
+ // make sure Decimal object has the same scale as DecimalType
+ if (input.changePrecision(precision, scale)) {
+ Platform.putLong(holder.buffer, getFieldOffset(ordinal),
input.toUnscaledLong());
+ } else {
+ setNullAt(ordinal);
+ }
+ }
+
+ public void write(int ordinal, Decimal input, int precision, int scale) {
+ // grow the global buffer before writing data.
+ holder.grow(16);
+
+ // zero-out the bytes
+ Platform.putLong(holder.buffer, holder.cursor, 0L);
+ Platform.putLong(holder.buffer, holder.cursor + 8, 0L);
+
+ // Make sure Decimal object has the same scale as DecimalType.
+ // Note that we may pass in null Decimal object to set null for it.
+ if (input == null || !input.changePrecision(precision, scale)) {
+ BitSetMethods.set(holder.buffer, startingOffset, ordinal);
+ // keep the offset for future update
+ final long relativeOffset = holder.cursor - startingOffset;
+ Platform.putLong(holder.buffer, getFieldOffset(ordinal),
relativeOffset << 32);
--- End diff --
setOffsetAndSize(ordinal, 0)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]