[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-23 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r327281197
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,96 +305,179 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
 
 Review comment:
   Agreed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-23 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r327265389
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,96 +305,179 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+  // The purpose of this uuid is to be able to know from two paths that 
they were written by the same operation.
+  // That's useful, for example, if a Spark job dies and leaves files in 
the file system, you can identify them all
+  // with a recursive listing and grep.
+  private final String uuid = UUID.randomUUID().toString();
+  private int fileCount;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+this.fileCount = 0;
+  }
+
+  private synchronized String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s-%05d", 
partitionId, taskId, uuid, fileCount++));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
-private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
 
-UnpartitionedWriter(
-EncryptedOutputFile outputFile,
-FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter {
+protected static final int ROWS_DIVISOR = 1000;
+
+protected final Set completedPartitions = Sets.newHashSet();
 
 Review comment:
   Will fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-23 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r327264957
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,96 +305,179 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+  // The purpose of this uuid is to be able to know from two paths that 
they were written by the same operation.
+  // That's useful, for example, if a Spark job dies and leaves files in 
the file system, you can identify them all
+  // with a recursive listing and grep.
+  private final String uuid = UUID.randomUUID().toString();
+  private int fileCount;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+this.fileCount = 0;
+  }
+
+  private synchronized String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s-%05d", 
partitionId, taskId, uuid, fileCount++));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
-private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
 
-UnpartitionedWriter(
-EncryptedOutputFile outputFile,
-FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter {
+protected static final int ROWS_DIVISOR = 1000;
+
+protected final Set completedPartitions = Sets.newHashSet();
+protected final List completedFiles = Lists.newArrayList();
+protected final PartitionSpec spec;
+protected final FileFormat format;
+protected final AppenderFactory appenderFactory;
+protected final OutputFileFactory fileFactory;
+protected final PartitionKey key;
+protected final FileIO fileIo;
+protected final long targetFileSize;
+protected PartitionKey currentKey = null;
+protected FileAppender currentAppender = null;
+protected EncryptedOutputFile currentFile = null;
+protected long currentRows;
 
 Review comment:
   I'd still need field `currentKey` to be accessible, since it is used in 
`openCurrent()` and `closeCurrent()` in base class, but also on 
`PartitionedWriter`'s `write()`.
   
   So I can:
   1) Only make `currentKey` protected.
   or
   2) Add getter/setter for this particular field.
   
   WDYT?
   
   > Does performance really degrade when the subclasses are a bit more 
separated from this base class?
   
   A method call is more expensive than a field access, although I admit that 
the JIT compiler should pick this up right away. So will fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-23 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r327261980
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,96 +305,179 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+  // The purpose of this uuid is to be able to know from two paths that 
they were written by the same operation.
+  // That's useful, for example, if a Spark job dies and leaves files in 
the file system, you can identify them all
+  // with a recursive listing and grep.
+  private final String uuid = UUID.randomUUID().toString();
+  private int fileCount;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+this.fileCount = 0;
+  }
+
+  private synchronized String generateFilename() {
 
 Review comment:
   Agreed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-16 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r324935182
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
+
+  private static class UnpartitionedWriter implements DataWriter {
 private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+private FileAppender currentAppender = null;
+private final OutputFileFactory fileFactory;
+private final FileFormat format;
+private final AppenderFactory appenderFactory;
+private EncryptedOutputFile currentFile = null;
+private final List completedFiles = Lists.newArrayList();
+private final long targetFileSize;
 
 UnpartitionedWriter(
-EncryptedOutputFile outputFile,
+OutputFileFactory fileFactory,
 FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+AppenderFactory appenderFactory,
+FileIO fileIo,
+long targetFileSize) {
+  this.fileFactory = fileFactory;
+  this.format = format;
+  this.appenderFactory = appenderFactory;
   this.fileIo = fileIo;
-  this.file = outputFile;
-  this.appender = factory.newAppender(file.encryptingOutputFile(), format);
+  this.targetFileSize = targetFileSize;
+
+  openCurrent();
 }
 
 @Override
-public void write(InternalRow record) {
-  appender.add(record);
+public void write(InternalRow record) throws IOException {
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-11 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r323429579
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
+
+  private static class UnpartitionedWriter implements DataWriter {
 private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+private FileAppender currentAppender = null;
+private final OutputFileFactory fileFactory;
+private final FileFormat format;
+private final AppenderFactory appenderFactory;
+private EncryptedOutputFile currentFile = null;
+private final List completedFiles = Lists.newArrayList();
+private final long targetFileSize;
 
 UnpartitionedWriter(
-EncryptedOutputFile outputFile,
+OutputFileFactory fileFactory,
 FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+AppenderFactory appenderFactory,
+FileIO fileIo,
+long targetFileSize) {
+  this.fileFactory = fileFactory;
+  this.format = format;
+  this.appenderFactory = appenderFactory;
   this.fileIo = fileIo;
-  this.file = outputFile;
-  this.appender = factory.newAppender(file.encryptingOutputFile(), format);
+  this.targetFileSize = targetFileSize;
+
+  openCurrent();
 }
 
 @Override
-public void write(InternalRow record) {
-  appender.add(record);
+public void write(InternalRow record) throws IOException {
 
 Review comment:
   Upon further testing, we have found that we are missing the target-file-size 
by a whole row-group size.
   
   Taking a look at `ParquetWriter`, the `currentAppender.length()` maps to:
   
   
https://github.com/apache/incubator-iceberg/blob/776210e3e49591a47c5d3d93ecd6d5c8b22e19b5/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java#L126-L133
   
   And this calls `ParquetFileWriter` from parquet-hadoop:
   ```java
 /**
  * @return the current position in the underlying file
  * @throws IOException if there is an error while getting the current 
stream's position
  */
 public long getPos() throws IOException {
   return out.getPos();
 }
   ```
   
   `out` being an OutputStream. So it doesn't look like we are doing the 
estimation of length including the current row group.
   
   Seems like `ParquetWriter` should really be doing `writer.getPos() + 
writeStore.getBufferedSize()` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-10 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r322825218
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,96 +304,179 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+  // The purpose of this uuid is to be able to know from two paths that 
they were written by the same operation.
+  // That's useful, for example, if a Spark job dies and leaves files in 
the file system, you can identify them all
+  // with a recursive listing and grep.
+  private final String uuid = UUID.randomUUID().toString();
+  private int fileCount;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+this.fileCount = 0;
+  }
+
+  private synchronized String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s-%05d", 
partitionId, taskId, uuid, fileCount++));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
-private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
 
-UnpartitionedWriter(
-EncryptedOutputFile outputFile,
-FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter {
+protected static final int ROWS_DIVISOR = 1000;
+
+protected final Set completedPartitions = Sets.newHashSet();
+protected final List completedFiles = Lists.newArrayList();
+protected final PartitionSpec spec;
+protected final FileFormat format;
+protected final AppenderFactory appenderFactory;
+protected final OutputFileFactory fileFactory;
+protected final PartitionKey key;
+protected final FileIO fileIo;
+protected final long targetFileSize;
+protected PartitionKey currentKey = null;
+protected FileAppender currentAppender = null;
+protected EncryptedOutputFile currentFile = null;
+protected long currentRows;
+
+BaseWriter(PartitionSpec spec, FileFormat format, 
AppenderFactory appenderFactory,
+OutputFileFactory fileFactory, FileIO fileIo, 
long targetFileSize) {
 
 Review comment:
   All right, fixed it to match the `WriterFactory` constructor in the `Writer` 
class that follows Option 1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-09 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r322507425
 
 

 ##
 File path: site/docs/configuration.md
 ##
 @@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table 
behavior, like the de
 | write.metadata.compression-codec   | none   | Metadata 
compression codec; none or gzip   |
 | write.metadata.metrics.default | truncate(16)   | Default metrics 
mode for all columns in the table; none, counts, truncate(length), or full |
 | write.metadata.metrics.column.col1 | (not set)  | Metrics mode for 
column 'col1' to allow per-column tuning; none, counts, truncate(length), or 
full |
+| write.target-file-size | Long.MAX_VALUE | Controls the size 
of files generated to target about this many bytes. |
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-09 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r322507446
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -102,6 +102,10 @@
 this.replacePartitions = replacePartitions;
 this.applicationId = applicationId;
 this.wapId = wapId;
+
+long tableTargetFileSize = Long.parseLong(table.properties().getOrDefault(
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-09 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r322469154
 
 

 ##
 File path: site/docs/configuration.md
 ##
 @@ -25,6 +25,7 @@ Iceberg tables support table properties to configure table 
behavior, like the de
 | write.metadata.compression-codec   | none   | Metadata 
compression codec; none or gzip   |
 | write.metadata.metrics.default | truncate(16)   | Default metrics 
mode for all columns in the table; none, counts, truncate(length), or full |
 | write.metadata.metrics.column.col1 | (not set)  | Metrics mode for 
column 'col1' to allow per-column tuning; none, counts, truncate(length), or 
full |
+| write.target-file-size | Long.MAX_VALUE | Controls the size 
of files generated to target about this many bytes. |
 
 Review comment:
   Agreed that being more specific can only help. Will change to 
`write.target-file-size-bytes`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-05 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321467275
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -354,59 +353,47 @@ public EncryptedOutputFile newOutputFile(PartitionKey 
key) {
 T newOutputFile(PartitionKey key);
   }
 
-  private static class UnpartitionedWriter implements DataWriter {
-private static final int ROWS_DIVISOR = 1000;
-
-private final FileIO fileIo;
-private FileAppender currentAppender = null;
-private final OutputFileFactory fileFactory;
-private final FileFormat format;
-private final AppenderFactory appenderFactory;
-private EncryptedOutputFile currentFile = null;
-private final List completedFiles = Lists.newArrayList();
-private final long targetFileSize;
-private long currentRows;
-
-UnpartitionedWriter(
-OutputFileFactory fileFactory,
-FileFormat format,
-AppenderFactory appenderFactory,
-FileIO fileIo,
-long targetFileSize) {
-  this.fileFactory = fileFactory;
+  @SuppressWarnings("checkstyle:VisibilityModifier") // direct access desired 
from sub-classes for performance.
+  private abstract static class BaseWriter implements DataWriter {
+protected static final int ROWS_DIVISOR = 1000;
+
+protected final Set completedPartitions = Sets.newHashSet();
+protected final List completedFiles = Lists.newArrayList();
+protected final PartitionSpec spec;
+protected final FileFormat format;
+protected final AppenderFactory appenderFactory;
+protected final OutputFileFactory fileFactory;
+protected final PartitionKey key;
+protected final FileIO fileIo;
+protected final long targetFileSize;
+protected PartitionKey currentKey = null;
+protected FileAppender currentAppender = null;
+protected EncryptedOutputFile currentFile = null;
+protected long currentRows;
+
+BaseWriter(PartitionSpec spec, FileFormat format, 
AppenderFactory appenderFactory,
+OutputFileFactory fileFactory, FileIO fileIo, 
long targetFileSize) {
+  this.spec = spec;
   this.format = format;
   this.appenderFactory = appenderFactory;
+  this.fileFactory = fileFactory;
+  this.key = new PartitionKey(spec);
   this.fileIo = fileIo;
   this.targetFileSize = targetFileSize;
-
-  openCurrent();
 }
 
 @Override
-public void write(InternalRow record) throws IOException {
-  if (currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= 
targetFileSize) {
-closeCurrent();
-openCurrent();
-  }
-
-  currentAppender.add(record);
-  currentRows++;
-}
+public abstract void write(InternalRow row) throws IOException;
 
 @Override
 public WriterCommitMessage commit() throws IOException {
-  Preconditions.checkArgument(currentAppender != null, "Commit called on a 
closed writer: %s", this);
 
 Review comment:
   These precondition checks on `currentAppender`, which were only done in 
`UnpartitionedWriter` are no longer done in `BaseWriter` since they make 
`PartitionedWriter` fail.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-05 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321466340
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -425,17 +498,18 @@ public WriterCommitMessage commit() throws IOException {
 
 @Override
 public void abort() throws IOException {
+  closeCurrent();
+
   // clean up files created by this writer
   Tasks.foreach(completedFiles)
   .throwFailureWhenFinished()
   .noRetry()
   .run(file -> fileIo.deleteFile(file.path().toString()));
+}
 
-  if (currentAppender != null) {
-currentAppender.close();
-this.currentAppender = null;
-fileIo.deleteFile(currentFile.encryptingOutputFile());
-  }
+private void openCurrent() {
 
 Review comment:
   Took a stab at the refactoring in this PR, but as its own individual commit 
so that its clear what has changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-05 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321465758
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-05 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321465855
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
 
 Review comment:
   Left this as is as per comment above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-05 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321465693
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
+
+  private static class UnpartitionedWriter implements DataWriter {
 private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+private FileAppender currentAppender = null;
+private final OutputFileFactory fileFactory;
+private final FileFormat format;
+private final AppenderFactory appenderFactory;
+private EncryptedOutputFile currentFile = null;
+private final List completedFiles = Lists.newArrayList();
+private final long targetFileSize;
 
 UnpartitionedWriter(
-EncryptedOutputFile outputFile,
+OutputFileFactory fileFactory,
 FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+AppenderFactory appenderFactory,
+FileIO fileIo,
+long targetFileSize) {
+  this.fileFactory = fileFactory;
+  this.format = format;
+  this.appenderFactory = appenderFactory;
   this.fileIo = fileIo;
-  this.file = outputFile;
-  this.appender = factory.newAppender(file.encryptingOutputFile(), format);
+  this.targetFileSize = targetFileSize;
+
+  openCurrent();
 }
 
 @Override
-public void write(InternalRow record) {
-  appender.add(record);
+public void write(InternalRow record) throws IOException {
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-04 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321031899
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
 
 Review comment:
   Actually, if we merge it, then the `DataWriter` does not have a handle to 
the currently open file. Those handles are useful to delete empty files on 
`closeCurrent()` and also to clean up on `abort()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-04 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321022808
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
 
 Review comment:
   Agreed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-04 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321022527
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
 
 Review comment:
   Right, it will require a counter now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-04 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321022403
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
 
 Review comment:
   I was under the impression the UUID was just for filename uniqueness. Will 
fix, and add your comment on the code so that its clear what the purpose is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-04 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r321022059
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -303,66 +305,128 @@ public String toString() {
 }
   }
 }
+
+private class EncryptedOutputFileFactory implements 
OutputFileFactory {
+  private final int partitionId;
+  private final long taskId;
+  private final long epochId;
+
+  EncryptedOutputFileFactory(int partitionId, long taskId, long epochId) {
+this.partitionId = partitionId;
+this.taskId = taskId;
+this.epochId = epochId;
+  }
+
+  private String generateFilename() {
+return format.addExtension(String.format("%05d-%d-%s", partitionId, 
taskId, UUID.randomUUID().toString()));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+OutputFile file = 
fileIo.newOutputFile(locations.newDataLocation(generateFilename()));
+return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+OutputFile rawOutputFile = 
fileIo.newOutputFile(locations.newDataLocation(spec, key, generateFilename()));
+return encryptionManager.encrypt(rawOutputFile);
+  }
+}
   }
 
   private interface AppenderFactory {
 FileAppender newAppender(OutputFile file, FileFormat format);
   }
 
-  private static class UnpartitionedWriter implements DataWriter, 
Closeable {
+  private interface OutputFileFactory {
+T newOutputFile();
+T newOutputFile(PartitionKey key);
+  }
+
+  private static class UnpartitionedWriter implements DataWriter {
 private final FileIO fileIo;
-private FileAppender appender = null;
-private Metrics metrics = null;
-private List offsetRanges = null;
-private final EncryptedOutputFile file;
+private FileAppender currentAppender = null;
+private final OutputFileFactory fileFactory;
+private final FileFormat format;
+private final AppenderFactory appenderFactory;
+private EncryptedOutputFile currentFile = null;
+private final List completedFiles = Lists.newArrayList();
+private final long targetFileSize;
 
 UnpartitionedWriter(
-EncryptedOutputFile outputFile,
+OutputFileFactory fileFactory,
 FileFormat format,
-AppenderFactory factory,
-FileIO fileIo) {
+AppenderFactory appenderFactory,
+FileIO fileIo,
+long targetFileSize) {
+  this.fileFactory = fileFactory;
+  this.format = format;
+  this.appenderFactory = appenderFactory;
   this.fileIo = fileIo;
-  this.file = outputFile;
-  this.appender = factory.newAppender(file.encryptingOutputFile(), format);
+  this.targetFileSize = targetFileSize;
+
+  openCurrent();
 }
 
 @Override
-public void write(InternalRow record) {
-  appender.add(record);
+public void write(InternalRow record) throws IOException {
 
 Review comment:
   Ah, I missed that in between all the abstraction. Makes sense, will fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] xabriel commented on a change in pull request #432: Allow writers to control size of files generated

2019-09-03 Thread GitBox
xabriel commented on a change in pull request #432: Allow writers to control 
size of files generated
URL: https://github.com/apache/incubator-iceberg/pull/432#discussion_r320344395
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
 ##
 @@ -102,6 +101,7 @@
 this.replacePartitions = replacePartitions;
 this.applicationId = applicationId;
 this.wapId = wapId;
+this.targetFileSize = options.getLong("target-file-size", Long.MAX_VALUE);
 
 Review comment:
   Makes sense. Will add.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org