Hello, two questions for ya'll:

Q1) I'm trying out Arrow IPC format, comparing it to Parquet. I'd like to
be able to write to a file in batches with dictionary encoding for the
"String" columns. Per the format docs at
https://arrow.apache.org/docs/format/Columnar.html#dictionary-messages,
each batch should record a dictionary when `isDelta` is false (which it
seems to be in the Java library). However the `ArrowFileWriter` and
`ArrowFileReader` seem to expect a single dictionary in the first batch and
fails to flush or read (respectively) dictionaries on subsequent batches. I
can tweak the state easily with reflection to properly flush and read the
dictionaries but I'd like to know if I'm missing something before trying to
fix up the underlying code. See the code example below.

Q2) The dictionary recipe
https://arrow.apache.org/cookbook/java/io.html#id21 and unit tests point to
creating `VarCharVector`s that are then encoded and decoded into new
vectors for use. I'd rather avoid the endec and work with the dictionary
indices directly. Is the following code the best way to do it?

```
@Test
  public void testMultiBatchWithDictionary() throws Exception {
    File file = new File("target/mytest_multi_dictionary.arrow");
    Map<String, Integer> stringToIndex = new HashMap<>();

    try (VarCharVector dictionaryVector = new VarCharVector("dictionary",
allocator)) {
      DictionaryEncoding dictionaryEncoding = new DictionaryEncoding(42,
false, new ArrowType.Int(16, false));

      Dictionary dictionary = new Dictionary(dictionaryVector,
dictionaryEncoding);
      DictionaryProvider.MapDictionaryProvider provider = new
DictionaryProvider.MapDictionaryProvider();
      provider.put(dictionary);

      try (UInt2Vector vector = new UInt2Vector(
          "vector",
          new FieldType(false, new ArrowType.Int(16, false),
dictionaryEncoding),
          allocator)) {
        vector.allocateNew(4);
        dictionaryVector.allocateNew(4);

        dictionaryVector.set(0, "foo".getBytes(StandardCharsets.UTF_8));
        stringToIndex.put("foo", 0);
        dictionaryVector.set(1, "bar".getBytes(StandardCharsets.UTF_8));
        stringToIndex.put("bar", 1);

        vector.set(0, stringToIndex.get("foo"));
        vector.set(1, stringToIndex.get("bar"));
        vector.set(2, stringToIndex.get("bar"));
        vector.set(3, stringToIndex.get("foo"));

        vector.setValueCount(4);
        dictionaryVector.setValueCount(4); // NOTE: Should be 2 really.

        VectorSchemaRoot root = VectorSchemaRoot.of(dictionaryVector,
vector);
        try (FileOutputStream fileOutputStream = new FileOutputStream(file);
             ArrowFileWriter arrowWriter = new ArrowFileWriter(root,
provider, fileOutputStream.getChannel());) {

          // batch 1
          arrowWriter.start();
          arrowWriter.writeBatch();
          dictionaryVector.reset();
          vector.reset();
          stringToIndex.clear();

          // TODO - This is needed to write the next dictionary
          java.lang.reflect.Field dictionariesWritten =
ArrowFileWriter.class.getDeclaredField("dictionariesWritten");
          dictionariesWritten.setAccessible(true);
          dictionariesWritten.set(arrowWriter, false);

          // note the order is different for the strings
          dictionaryVector.set(0, "bar".getBytes(StandardCharsets.UTF_8));
          stringToIndex.put("bar", 0);
          dictionaryVector.set(1, "foo".getBytes(StandardCharsets.UTF_8));
          stringToIndex.put("foo", 1);

          vector.set(0, stringToIndex.get("bar"));
          vector.set(1, stringToIndex.get("bar"));
          vector.set(2, stringToIndex.get("foo"));
          vector.set(3, stringToIndex.get("foo"));

          dictionaryVector.setValueCount(2);
          vector.setValueCount(4);

          arrowWriter.writeBatch();
          arrowWriter.end();
        }
      }
    }

    try (FileInputStream fileInputStream = new FileInputStream(file);
         ArrowFileReader reader = new
ArrowFileReader(fileInputStream.getChannel(), allocator);) {
      VectorSchemaRoot root = reader.getVectorSchemaRoot();
      assertEquals(reader.getRecordBlocks().size(), 2);
      assertTrue(reader.loadNextBatch());

      FieldVector encoded = root.getVector("vector");
      DictionaryEncoding dictionaryEncoding =
encoded.getField().getDictionary();
      Dictionary dictionary =
reader.getDictionaryVectors().get(dictionaryEncoding.getId());
      try (ValueVector decoded = DictionaryEncoder.decode(encoded,
dictionary)) {
        assertEquals(decoded.getObject(0).toString(), "foo");
        assertEquals(decoded.getObject(1).toString(), "bar");
        assertEquals(decoded.getObject(2).toString(), "bar");
        assertEquals(decoded.getObject(3).toString(), "foo");
      }

      assertTrue(reader.loadNextBatch());
      // TODO without the load, values are mapped only to the first
dictionary.
      Method loadDictionary =
ArrowReader.class.getDeclaredMethod("loadDictionary",
ArrowDictionaryBatch.class);
      loadDictionary.invoke(reader, reader.readDictionary());

      try (ValueVector decoded = DictionaryEncoder.decode(encoded,
dictionary)) {
        System.out.println(decoded);
        assertEquals(decoded.getObject(0).toString(), "bar");
        assertEquals(decoded.getObject(1).toString(), "bar");
        assertEquals(decoded.getObject(2).toString(), "foo");
        assertEquals(decoded.getObject(3).toString(), "foo");
      }
    }
  }
```

Reply via email to