Hi, we are using the Apache ORC library to generate ORC files. That works generally well but every now and then we run into two kinds of exceptions coming from the ORC writer, on one hand a NullPointerException and on the other hand an IOException caused by an IllegalArgumentException.
I only get these exceptions sporadically about once a week when processing production data (while writing millions of very similar rows during the same time without any problem). Up to now I was not able to reproduce the problem in a test or find out what aspect in the production data causes this. (Note that in our system the production data which has been used to generate the ORC file is not persistently stored and not easily available anymore after the problem has happened.) However, by now I'm quite sure that it's not a single record/row being written which is the cause but rather certain sequences of (possibly hundred thousands) of records/rows. I'm not sure if it is a bug in our code or a bug in the Apache ORC library, but I wouldn't know what to do differently in our code. The NPE looks like this: java.io.IOException: Problem adding row to file:/tmp/TemporaryDiskFileHandler-220511-151014-8678865386464979027/cmd-220511-154712-17198614794670643856/tmp-220511-154712-1756785427558352227.file at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:723) at io.wallee.analyticsserver.job.util.orc.AsOrcWriter.flushBatch(AsOrcWriter.java:201) at io.wallee.analyticsserver.job.util.orc.AsOrcWriter.close(AsOrcWriter.java:155) [... SNIP ...] Caused by: java.lang.NullPointerException Interestingly I don't get any stack trace for the NPE. I tried in tests to write records/rows with any combination of null values I could think of, but I never saw an NPE like this in the tests, only sometimes in production. The IOException/IllegalArgumentException looks like this: java.io.IOException: Problem adding row to file:/tmp/TemporaryDiskFileHandler-220525-060004-17205419383800653529/cmd-220525-071515-1571725702009800891/tmp-220525-071515-16370952990878138938.file at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:723) at io.wallee.analyticsserver.job.util.orc.AsOrcWriter.flushBatch(AsOrcWriter.java:218) at io.wallee.analyticsserver.job.util.orc.AsOrcWriter.close(AsOrcWriter.java:163) [... SNIP ...] Caused by: java.lang.IllegalArgumentException: Out of bounds decimal64 1000000000000000000 at org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal64(ColumnStatisticsImpl.java:1309) at org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1292) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:103) at org.apache.orc.impl.writer.DecimalTreeWriter.writeBatch(DecimalTreeWriter.java:163) at org.apache.orc.impl.writer.StructTreeWriter.writeRootBatch(StructTreeWriter.java:56) at org.apache.orc.impl.WriterImpl.addRowBatch(WriterImpl.java:699) ... 17 more I tried in tests to write BigDecimal values of size 1000000000000000000 and even much bigger, but I could not reproduce this problem. I attached the sources of our AsOrcWriter class for reference. (It is pretty much self-containing and the code calling it should not matter much.) We are currently using org.apache.orc:orc-core:1.7.3, but I plan to upgrade to the latest version in the coming week (though I don't have much hope that this will fix the problems). As I am trying to fix these issues for two weeks now without any success I would be very happy for any kind of help or hint what I could do to fix it. Greetings, Matthias -- Matthias Meier Software Developer / CISO M + 41 78 752 67 86 wallee.com Disclaimer <https://wallee.com/disclaimer.html> • Legal information <https://wallee.com/imprint.html>
/** * */ package io.wallee.analyticsserver.job.util.orc; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; import java.util.Collection; import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Helper class to write ORC files. * <p> * Instances are <strong>not threadsafe</strong>! * * @author Matthias Meier */ public final class AsOrcWriter implements Closeable { private static final Logger log = LoggerFactory.getLogger(AsOrcWriter.class); private final String debugContext; private final Path path; private final TypeDescription typeDescription; private final Writer writer; private final VectorizedRowBatch batch; private long batchCnt = 0; private Map<String, Object> currentValue; /** * Constructor. * * @param path * the path of the file to write * @param typeDescription * the description of the (struct) type to write * @param bloomFilterColumns * the columns for which bloom filters should be enabled * @param debugContext * optional context information which is appended to log statements to assist in debugging problems * * @throws UncheckedIOException * if opening the file for writing fails due to some I/O error * @throws IllegalArgumentException * if the specified {@code typeDescription} has a category different from * {@link org.apache.orc.TypeDescription.Category#STRUCT STRUCT} * @throws NullPointerException * if any of the parameters are {@code null} or if the {@code bloomFilterColumns} collection contains * any {@code null} values */ public AsOrcWriter(final Path path, final TypeDescription typeDescription, final Collection<? extends CharSequence> bloomFilterColumns, final String debugContext) { this.debugContext = debugContext; this.path = Objects.requireNonNull(path); this.typeDescription = Objects.requireNonNull(typeDescription); if (this.typeDescription.getCategory() != TypeDescription.Category.STRUCT) { throw new IllegalArgumentException( "Can only handle type descriptions of category STRUCT but the specified typeDescription has category: " + this.typeDescription.getCategory()); } Objects.requireNonNull(bloomFilterColumns); final Configuration conf = new Configuration(); final OrcFile.WriterOptions opts = OrcFile.writerOptions(conf); opts.setSchema(typeDescription); opts.bloomFilterColumns(String.join(",", bloomFilterColumns)); final org.apache.hadoop.fs.Path hpath = new org.apache.hadoop.fs.Path(path.toUri()); try { // If the file already exists, we delete it, it will be "overwritten" (instead of the writer factory method // to complain about an existing file): Files.deleteIfExists(path); this.writer = OrcFile.createWriter(hpath, opts); } catch (final IOException exc) { throw new UncheckedIOException("Failed to open file '" + path + "' for writing: " + exc, exc); } this.batch = typeDescription.createRowBatch(); } /** * @return the path of the local file into which we write */ public Path getPath() { return path; } /** * Returns the number of batches written so far. The batch count gets updated when flushing a batch, so it will not * include the batch which is currently being written. To get an accurate batch count this method should be called * after closing the writer. * * @return the number of batches written so far */ public long getNumberOfBatches() { return batchCnt; } /** * Returns the number of rows in file. The row count gets updated when flushing the stripes. To get an accurate row * count this method should be called after closing the writer. * * @return the number of rows in file */ public long getNumberOfRows() { return writer.getNumberOfRows(); } /** * Returns the deserialized data size. The raw data size will be computed when writing the file footer. Hence the * raw data size value will be available only after closing the writer (and otherwise be zero). * * @return the deserialized data size (in bytes) */ public long getRawDataSize() { return writer.getRawDataSize(); } /** * Flushes any remaining buffered data and closes the underlying file stream. * * @throws UncheckedIOException * if writing to the file or closing the file stream fails due to some I/O error */ @Override public void close() { try (final Closeable closer = () -> { writer.close(); }) { if (batch.size != 0) { try { flushBatch(); } catch (final IOException exc1) { if (log.isTraceEnabled()) { log.trace(batch.toString(), exc1); } throw new UncheckedIOException("Failed to flush batch to '" + path + "': " + exc1, exc1); } } } catch (final IOException exc) { throw new UncheckedIOException("Failed to close underlying Orc writer to '" + path + "': " + exc, exc); } } /** * Writes a row into the ORC file. * * @param value * the data of the row to write (as a property map conforming to this writers type description}) * * @throws UncheckedIOException * if writing to the file fails due to some I/O error * @throws IllegalArgumentException * if the specified {@code value} cannot be converted to an Orc row conforming to the * {@link TypeDescription} specified at construction * @throws NullPointerException * if {@code value} is {@code null} */ public void write(final Map<String, Object> value) { Objects.requireNonNull(value); currentValue = value; try { final int row = batch.size++; addStruct(row, "{" + batchCnt + "/" + row + "}", typeDescription, batch.cols, value); if (batch.size == batch.getMaxSize()) { try { flushBatch(); } catch (final IOException exc) { if (log.isTraceEnabled()) { log.trace(batch.toString(), exc); } throw new UncheckedIOException("Failed to flush batch to '" + path + "' at row " + row + ": " + exc, exc); } } } finally { currentValue = null; } } private void flushBatch() throws IOException { try { writer.addRowBatch(batch); batch.reset(); batchCnt++; } catch (final Throwable t) { try { final String msg; try (final Formatter fmt = new Formatter()) { fmt.format("Failure when flushing batch: %s", t); String cvs = currentValue == null ? "null" : currentValue.toString(); if (cvs.length() > 240) { cvs = cvs.substring(0, 240) + "[...SNIP...]"; } fmt.format("\n currentValue: %s", currentValue); fmt.format("\n numberOfRows: %s", getNumberOfRows()); fmt.format("\n batchCnt: %s", batchCnt); fmt.format("\n batch.size: %d", batch.size); fmt.format("\n batch.maxSize: %d", batch.getMaxSize()); if (batch.cols == null) { fmt.format("\n batch.cols: null"); } else { fmt.format("\n batch.cols (%d):", batch.cols.length); for (int i = 0; i < batch.cols.length; i++) { final ColumnVector cv = batch.cols[i]; if (cv == null) { fmt.format("\n [%3d] null", i); } else { fmt.format("\n [%3d] %s:", i, cv.type); fmt.format("\n ref: %d", cv.getRef()); fmt.format("\n isRepeating: %s", cv.isRepeating); fmt.format("\n noNulls: %s", cv.noNulls); fmt.format("\n isNull: %s", formatBooleanArrayTruncated(cv.isNull)); } } } if (debugContext != null) { fmt.format("\n Context: %s", debugContext); } msg = fmt.toString(); } log.info(msg, t); } catch (Throwable tt) { // Just a precaution in case the above logging code (which is rather complex) should contain any bugs // which lead to exceptions being thrown when formatting the log message: log.error("Failed to log detail info about failure on flushing batch: " + tt, tt); } throw t; } } private static void addStruct(final int row, final String path, final TypeDescription td, final ColumnVector[] childColumnVectors, final Map<?, ?> value) { final List<String> fieldNames = td.getFieldNames(); final int sz = fieldNames.size(); if (sz > 0) { final List<TypeDescription> fieldTypes = td.getChildren(); // TODO: should we ensureSize(...) here!? final StringBuilder p = new StringBuilder(path.length() + 40); p.append(path).append('.'); final int pPref = p.length(); for (int fieldIdx = 0; fieldIdx < sz; fieldIdx++) { final String cn = fieldNames.get(fieldIdx); p.setLength(pPref); p.append(cn); final TypeDescription ctd = fieldTypes.get(fieldIdx); final ColumnVector ccv = childColumnVectors[fieldIdx]; addValue(row, p.toString(), ctd, ccv, value.get(cn)); } } } private static void addValue(final int row, final String path, final TypeDescription td, final ColumnVector cv, final Object value) { if (value == null) { cv.isNull[row] = true; cv.noNulls = false; } else { // // Note: See the private method 'ColumnVector createColumn(int maxSize)' in 'org.apache.orc.TypeDescription' // to see which column vector type is used for which category! // switch (td.getCategory()) { case STRUCT: // StructColumnVector final StructColumnVector scv = (StructColumnVector) cv; final Map<?, ?> structValue = castValue(path, Map.class, value); addStruct(row, path, td, scv.fields, structValue); return; case LIST: // ListColumnVector final List<?> listValue = castValue(path, List.class, value); addList(row, path, td, cv, listValue); return; case MAP: // MapColumnVector final Map<?, ?> mapValue = castValue(path, Map.class, value); addMap(row, path, td, cv, mapValue); return; case BOOLEAN: // LongColumnVector final long booleanValue = castValue(path, Boolean.class, value).booleanValue() ? 1 : 0; addLongValue(row, cv, booleanValue); return; case DATE: // LongColumnVector case LONG: final long longValue = castValue(path, Long.class, value).longValue(); addLongValue(row, cv, longValue); return; case DECIMAL: // DecimalColumnVector final BigDecimal decimalValue = castValue(path, BigDecimal.class, value); addDecimalValue(row, cv, decimalValue); return; case DOUBLE: // DoubleColumnVector final double doubleValue = castValue(path, Double.class, value).doubleValue(); addDoubleValue(row, cv, doubleValue); return; case STRING: // BytesColumnVector final String stringValue = castValue(path, String.class, value); addBytesValueFromString(row, cv, stringValue); return; case INT: // LongColumnVector final int intValue = castValue(path, Integer.class, value).intValue(); addLongValue(row, cv, intValue); return; case TIMESTAMP: // TimestampColumnVector final Instant timestampValue = castValue(path, Instant.class, value); addTimestampValue(row, cv, timestampValue); return; case BINARY: case BYTE: case CHAR: case FLOAT: case SHORT: case UNION: case VARCHAR: case TIMESTAMP_INSTANT: throw new IllegalStateException( "Unsupported Orc category: " + td.getCategory() + " (" + cv.getClass().getName() + ")"); } throw new IllegalStateException( "Unknown Orc category: " + td.getCategory() + " (" + cv.getClass().getName() + ")"); } } private static void addList(final int row, final String path, final TypeDescription td, final ColumnVector cv, final List<?> value) { final int sz = value.size(); if (sz > 0) { final ListColumnVector lcv = (ListColumnVector) cv; final TypeDescription childType = td.getChildren().get(0); final ColumnVector ccv = lcv.child; final int offset = lcv.childCount; lcv.offsets[row] = offset; lcv.lengths[row] = sz; lcv.childCount += sz; ccv.ensureSize(lcv.childCount, true); final StringBuilder p = new StringBuilder(path.length() + 6); p.append(path).append('['); final int pPref = p.length(); for (int i = 0; i < sz; i++) { final int childRow = offset + i; p.setLength(pPref); p.append(i).append(']'); addValue(childRow, p.toString(), childType, ccv, value.get(i)); } } } private static void addMap(final int row, final String path, final TypeDescription td, final ColumnVector cv, final Map<?, ?> value) { final int sz = value.size(); if (sz > 0) { final MapColumnVector mcv = (MapColumnVector) cv; final TypeDescription keyType = td.getChildren().get(0); if (keyType.getCategory() != TypeDescription.Category.STRING) { throw new IllegalStateException( "Unsupported Orc category for map key (only STRING supported): " + td.getCategory()); } final ColumnVector kcv = mcv.keys; final TypeDescription valueType = td.getChildren().get(1); final ColumnVector ccv = mcv.values; final int offset = mcv.childCount; mcv.offsets[row] = offset; mcv.lengths[row] = sz; mcv.childCount += sz; kcv.ensureSize(mcv.childCount, true); ccv.ensureSize(mcv.childCount, true); final StringBuilder p = new StringBuilder(path.length() + 6); p.append(path).append('['); int childRow = offset; for (final Map.Entry<?, ?> e : value.entrySet()) { if (e.getKey() == null) { throw new IllegalArgumentException(path + ": The map contains a null key which is not allowed."); } final String k = e.getKey().toString(); p.append(k).append(']'); addBytesValueFromString(childRow, kcv, k); final Object v = e.getValue(); addValue(childRow, p.toString(), valueType, ccv, v); childRow++; } } } private static void addBytesValueFromString(final int row, final ColumnVector cv, final String value) { final byte[] bytes = value.getBytes(StandardCharsets.UTF_8); addBytesValue(row, cv, bytes); } private static void addBytesValue(final int row, final ColumnVector cv, final byte[] value) { assert value != null; final BytesColumnVector bcv = (BytesColumnVector) cv; bcv.vector[row] = value; bcv.start[row] = 0; bcv.length[row] = value.length; } private static void addDecimalValue(final int row, final ColumnVector cv, final BigDecimal value) { assert value != null; final DecimalColumnVector dcv = (DecimalColumnVector) cv; final BigDecimal adjustedValue = value.scale() > 0 ? value : value.setScale(0); // final BigDecimal adjustedValue = value; dcv.vector[row] = new HiveDecimalWritable(adjustedValue.unscaledValue().toByteArray(), adjustedValue.scale()); } private static void addDoubleValue(final int row, final ColumnVector cv, final double value) { final DoubleColumnVector dcv = (DoubleColumnVector) cv; dcv.vector[row] = value; } private static void addLongValue(final int row, final ColumnVector cv, final long value) { final LongColumnVector lcv = (LongColumnVector) cv; lcv.vector[row] = value; } private static void addTimestampValue(final int row, final ColumnVector cv, final Instant value) { assert value != null; final TimestampColumnVector tcv = (TimestampColumnVector) cv; tcv.time[row] = value.getEpochSecond() * 1000; tcv.nanos[row] = value.getNano(); } private static <E> E castValue(final String path, final Class<E> targetClass, final Object value) { assert value != null; try { return targetClass.cast(value); } catch (final ClassCastException cce) { throw new IllegalArgumentException(path + ": Failed to cast source value of type '" + value.getClass().getName() + "' to expected target type '" + targetClass.getName() + "'.", cce); } } static String formatBooleanArrayTruncated(final boolean[] arr) { if (arr == null) { return "null"; } else if (arr.length > 112) { return formatBooleanArray(arr, 0, 50) + "[...SNIP...]" + formatBooleanArray(arr, arr.length - 50, arr.length); } else { return formatBooleanArray(arr, 0, arr.length); } } private static String formatBooleanArray(final boolean[] arr, final int start, final int end) { final StringBuilder sb = new StringBuilder(); for (int i = start; i < end; i++) { sb.append(arr[i] ? "1" : "0"); } return sb.toString(); } }