[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-09-04 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r320878326
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   Can you revert this change? I'd like to see if it works with the recent 
changes to reduce the number of clients in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319589726
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +306,104 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(conf: SerializableConfiguration,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec,
+  basePath: String): Iterator[Manifest] = {
+if (sparkDataFiles.isEmpty) {
+  Seq.empty.iterator
+}
 
 Review comment:
   Good catch. I assumed this method had not been changed much. This should be 
fixed by wrapping the write in an else case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319588722
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   I think that using Spark's catalog will avoid some of these issues because 
tests will reuse the same connection.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-28 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318684701
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -131,18 +136,22 @@ object SparkTableUtil {
 s"$name=${partition(name)}"
   }.mkString("/")
 
-  DataFiles.builder(spec)
-  .withPath(path)
-  .withFormat(format)
-  .withPartitionPath(partitionKey)
-  .withFileSizeInBytes(fileSize)
-  .withMetrics(new Metrics(rowCount,
-arrayToMap(columnSizes),
-arrayToMap(valueCounts),
-arrayToMap(nullValueCounts),
-arrayToMap(lowerBounds),
-arrayToMap(upperBounds)))
-  .build()
+  var builder = DataFiles.builder(spec)
+.withPath(path)
+.withFormat(format)
+.withFileSizeInBytes(fileSize)
+.withMetrics(new Metrics(rowCount,
+  arrayToMap(columnSizes),
+  arrayToMap(valueCounts),
+  arrayToMap(nullValueCounts),
+  arrayToMap(lowerBounds),
+  arrayToMap(upperBounds)))
+
+  if (partitionKey == "") {
+builder.build()
+  } else {
+builder.withPartitionPath(partitionKey).build()
 
 Review comment:
   Yeah, that looks good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-28 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318680962
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +306,110 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(conf: SerializableConfiguration,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec,
+  basePath: String): Iterator[Manifest] = {
+if (sparkDataFiles.isEmpty) {
+  Seq.empty.iterator
+}
+
+val io = new HadoopFileIO(conf.get())
+val ctx = TaskContext.get()
+val location = new Path(basePath,
+  s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest")
+val outputFile = 
io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+val manifestFile = writer.toManifestFile
+Seq(Manifest(manifestFile.path, manifestFile.length, 
manifestFile.partitionSpecId)).iterator
+  }
+
+  private case class Manifest(location: String, fileLength: Long, specId: Int) 
{
+def toManifestFile: ManifestFile = new ManifestFile {
+  override def path: String = location
+
+  override def length: Long = fileLength
+
+  override def partitionSpecId: Int = specId
+
+  override def snapshotId: java.lang.Long = null
+
+  override def addedFilesCount: Integer = null
+
+  override def existingFilesCount: Integer = null
+
+  override def deletedFilesCount: Integer = null
+
+  override def partitions: 
java.util.List[ManifestFile.PartitionFieldSummary] = null
+
+  override def copy: ManifestFile = this
+}
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   * @param numOfManifest the expected number of manifest file to be created
+   * @param stagingDir the staging directory to store temporary manifest file
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(
+  source: TableIdentifier,
+  location: String,
+  numOfManifest: Int,
+  stagingDir: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val serializableConfiguration = new SerializableConfiguration(conf)
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val catalogTable = Hive.getTable(sparkSession, s"$dbName.$tableName")
+  val files = listPartition(Map.empty[String, String], 
catalogTable.location.toString,
+catalogTable.storage.serde.getOrElse("none"))
+  files.foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  val manifests = partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.repartition(numOfManifest).mapPartitions {
 
 Review comment:
   I think we need to change a couple of things:
   1. Use `spark.sqlContext.conf.numShufflePartitions` instead of passing in a 
partition size to repartition. This parallelism setting is already something 
users can configure.
   2. Add a sort by the file path to cluster data by partition into manifests. 
The converted table will have better performance.
   
   I think it should look like this:
   
   ```scala
   val manifests = partitions.flatMap { row =>
   listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
 }.repartition(spark.sqlContext.conf.numShufflePartitions)
 .orderBy($"path")
 .mapPartitions {
   ...
 }.collect().map(_.toManifestFile)
   ```
   
   The repartition is still needed to ensure that the partition listing isn't 

[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-28 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318677245
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -131,18 +136,22 @@ object SparkTableUtil {
 s"$name=${partition(name)}"
   }.mkString("/")
 
-  DataFiles.builder(spec)
-  .withPath(path)
-  .withFormat(format)
-  .withPartitionPath(partitionKey)
-  .withFileSizeInBytes(fileSize)
-  .withMetrics(new Metrics(rowCount,
-arrayToMap(columnSizes),
-arrayToMap(valueCounts),
-arrayToMap(nullValueCounts),
-arrayToMap(lowerBounds),
-arrayToMap(upperBounds)))
-  .build()
+  var builder = DataFiles.builder(spec)
+.withPath(path)
+.withFormat(format)
+.withFileSizeInBytes(fileSize)
+.withMetrics(new Metrics(rowCount,
+  arrayToMap(columnSizes),
+  arrayToMap(valueCounts),
+  arrayToMap(nullValueCounts),
+  arrayToMap(lowerBounds),
+  arrayToMap(upperBounds)))
+
+  if (partitionKey == "") {
+builder.build()
+  } else {
+builder.withPartitionPath(partitionKey).build()
 
 Review comment:
   That's correct.
   
   Iceberg considers conversion from partition tuple to partition path a 
one-way conversion. We only want to use `withPartitionPath` for conversions 
like this one where we are importing data from Hive tables. In that case, it 
isn't clear what the date format will be because many people use strings for 
date partition fields.
   
   I think we could probably add support for ISO-8601 dates, but we want to 
avoid putting a lot of effort into parsing strings into partition values.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-28 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318669321
 
 

 ##
 File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
 ##
 @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() {
 Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, 
qualifiedTableName, "data = 'a'");
 Assert.assertEquals("There should be 1 matching partition", 1, 
partitionDF.count());
   }
+
+  @Test
+  public void testImportPartitionedTable() throws Exception {
 
 Review comment:
   Actually, I think it is okay to commit this with support for Hadoop tables 
and add the Hive table support later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318254975
 
 

 ##
 File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
 ##
 @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() {
 Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, 
qualifiedTableName, "data = 'a'");
 Assert.assertEquals("There should be 1 matching partition", 1, 
partitionDF.count());
   }
+
+  @Test
+  public void testImportPartitionedTable() throws Exception {
 
 Review comment:
   Can you run these tests to create Hive catalog tables as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318254975
 
 

 ##
 File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
 ##
 @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() {
 Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, 
qualifiedTableName, "data = 'a'");
 Assert.assertEquals("There should be 1 matching partition", 1, 
partitionDF.count());
   }
+
+  @Test
+  public void testImportPartitionedTable() throws Exception {
 
 Review comment:
   Can you run these tests with Hive tables as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318200212
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
 
 Review comment:
   This can't be local because it is written on executors and read from the 
driver. If those are on different machines, the driver won't be able to find 
the location.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317712572
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
 
 Review comment:
   Maybe you should have users pass in a location to use for this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317712345
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
 
 Review comment:
   This location must be a shared location, either in HDFS or a distribute FS 
like S3. It can't be the local host.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711705
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
+throw new UnsupportedOperationException(s"Unsupported format: $format")
+  }
+  listPartition(Map.empty[String, String], tableMetadata.location.toString,
+format).foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+  appender.commit()
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.coalesce(1).mapPartitions {
 
 Review comment:
   And here's the `Manifest` case class:
   
   ```scala
   private[sql] case class Manifest(location: String, fileLength: Long, specId: 
Int) {
 def toManifestFile: ManifestFile = new ManifestFile {
   override def path: String = location
   
   override def length: Long = fileLength
   
   override def partitionSpecId: Int = specId
   
   override def snapshotId: java.lang.Long = null
   
   override def addedFilesCount: Integer = null
   
   override def existingFilesCount: Integer = null
   
   override def deletedFilesCount: Integer = null
   
   override def partitions: 
java.util.List[ManifestFile.PartitionFieldSummary] = null
   
   override def copy: ManifestFile = this
 }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711446
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
+throw new UnsupportedOperationException(s"Unsupported format: $format")
+  }
+  listPartition(Map.empty[String, String], tableMetadata.location.toString,
+format).foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+  appender.commit()
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.coalesce(1).mapPartitions {
 
 Review comment:
   Here's `writeManifest`:
   
   ```scala
 def writeManifest(
 conf: SerializableConfiguration,
 spec: PartitionSpec,
 basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = {
   files =>
 if (files.hasNext) {
   val ctx = TaskContext.get()
   val manifestLocation = new Path(basePath,
 
s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest.avro").toString
   val io = new HadoopFileIO(conf.value)
   val writer = ManifestWriter.write(spec, 
io.newOutputFile(manifestLocation))
   
   try {
 files.foreach { file =>
   writer.add(file.toDataFile(spec))
 }
   } finally {
 writer.close()
   }
   
   val manifest = writer.toManifestFile
   Seq(Manifest(manifest.path, manifest.length, 
manifest.partitionSpecId)).iterator
   
 } else {
   Seq.empty.iterator
 }
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317710889
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
+throw new UnsupportedOperationException(s"Unsupported format: $format")
+  }
+  listPartition(Map.empty[String, String], tableMetadata.location.toString,
+format).foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+  appender.commit()
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.coalesce(1).mapPartitions {
+files =>
+  val tables = new HadoopTables(new Configuration())
 
 Review comment:
   Can you use `SerializableConfiguration` instead? This should use Spark's 
`hadoopConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711151
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
 
 Review comment:
   This should use the same logic that `partitionDF` uses to detect the format. 
It should check the serde's input format.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317710578
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
+throw new UnsupportedOperationException(s"Unsupported format: $format")
+  }
+  listPartition(Map.empty[String, String], tableMetadata.location.toString,
+format).foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+  appender.commit()
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.coalesce(1).mapPartitions {
 
 Review comment:
   Why `coalesce(1)` here?
   
   In our version of this, we add a sort by file name and build manifests in 
parallel:
   
   ```scala
   val tempPath = new 
Path(s"hdfs:/tmp/iceberg-conversions/$applicationId")
   
   val manifests: Seq[ManifestFile] = files
   .repartition(100) // repartition to shuffle the data and not 
list partitions twice
   .orderBy($"path")
   .mapPartitions(writeManifest) // writes manifests to tempPath
   .collect()
   .map(_.toManifestFile)
   
   val append = table.newAppend
   manifests.foreach(append.appendManifest)
   append.commit()
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-26 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317708705
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
 
 Review comment:
   No, that's okay. If the table isn't partitioned, we don't need to get the 
partitions as a dataframe. It is fine to run this on the driver in that case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317377838
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
 
 Review comment:
   Why is there a special case for unpartitioned tables? Doesn't `partitionDF` 
return a single partition in that case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373956
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
 
 Review comment:
   Style: please use `foreach` instead of `for` loops in scala.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373945
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
 
 Review comment:
   Nit: indentation is off.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373916
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(8);
 
 Review comment:
   What have previous tests not released? What is the "default release time"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373882
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) {
 HiveConf newHiveConf = new HiveConf(new Configuration(), 
TestHiveMetastore.class);
 newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
 newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + 
hiveLocalDir.getAbsolutePath());
+newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, 
"false");
 
 Review comment:
   What warning?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373878
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -19,18 +19,22 @@
 
 package org.apache.iceberg.spark
 
+import com.google.common.collect.ImmutableMap
 import com.google.common.collect.Maps
 import java.nio.ByteBuffer
 import java.util
+import java.util.UUID
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, 
PartitionSpec}
-import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg._
 
 Review comment:
   We never use wildcard imports.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824567
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -19,18 +19,22 @@
 
 package org.apache.iceberg.spark
 
+import com.google.common.collect.ImmutableMap
 import com.google.common.collect.Maps
 import java.nio.ByteBuffer
 import java.util
+import java.util.UUID
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, 
PartitionSpec}
-import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg._
 
 Review comment:
   Please don't use wildcard imports.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824445
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) {
 HiveConf newHiveConf = new HiveConf(new Configuration(), 
TestHiveMetastore.class);
 newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
 newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + 
hiveLocalDir.getAbsolutePath());
+newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, 
"false");
 
 Review comment:
   What does this do?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824375
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(8);
 
 Review comment:
   Why was this change required?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-15 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399797
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val tableMetadata = sparkSession.sessionState.catalog.
+  getTableMetadata(new TableIdentifier(tableName, Some(dbName)))
+
+val format = tableMetadata.provider.getOrElse("none")
+if (format != "avro" && format != "parquet" && format != "orc") {
+  throw new UnsupportedOperationException(s"Unsupported format: $format")
+}
+
+val location = tableMetadata.location.toString
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+
+val fastAppender = table.newFastAppend()
+
+val partitions = sparkSession.sessionState.catalog.externalCatalog
+  .listPartitionNames(dbName, tableName)
+if (partitions.isEmpty) {
+  val dataFiles = SparkTableUtil.listPartition(Map.empty[String, String], 
location, format)
+  fastAppender.appendManifest(buildManifest(table, dataFiles, 
PartitionSpec.unpartitioned))
+} else {
+  // Retrieve data files according to partition. result = [[datafiles], 
[datafiles]]
+  val dataFiles = partitions.map { e =>
+SparkTableUtil.listPartition(partitionToMap(e), location + "/" + e, 
format)
+  }
+
+  // Append manifest for each partition
+  dataFiles.foreach { partition =>
+fastAppender.appendManifest(buildManifest(table, partition, 
partitionSpec))
+  }
+}
+
+fastAppender.apply()
 
 Review comment:
   There's no need to call apply. Just commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-15 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399700
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val tableMetadata = sparkSession.sessionState.catalog.
+  getTableMetadata(new TableIdentifier(tableName, Some(dbName)))
+
+val format = tableMetadata.provider.getOrElse("none")
+if (format != "avro" && format != "parquet" && format != "orc") {
+  throw new UnsupportedOperationException(s"Unsupported format: $format")
+}
+
+val location = tableMetadata.location.toString
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+
+val fastAppender = table.newFastAppend()
+
+val partitions = sparkSession.sessionState.catalog.externalCatalog
+  .listPartitionNames(dbName, tableName)
 
 Review comment:
   This should use `partitionDF` and call `listPartition` in parallel for large 
tables. See the example notebook: 
https://github.com/apache/incubator-iceberg/blob/master/examples/Convert%20table%20to%20Iceberg.ipynb


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-15 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399396
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val tableMetadata = sparkSession.sessionState.catalog.
+  getTableMetadata(new TableIdentifier(tableName, Some(dbName)))
+
+val format = tableMetadata.provider.getOrElse("none")
 
 Review comment:
   This should convert any Hive table, not just Spark DataSource tables. That's 
why the `listPartition` method supports a format for each partition. Use 
`partitionDF` to get a dataframe with each partition and format.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-15 Thread GitBox
rdblue commented on a change in pull request #374: Migrate spark table to 
iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314397922
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val tableMetadata = sparkSession.sessionState.catalog.
+  getTableMetadata(new TableIdentifier(tableName, Some(dbName)))
+
+val format = tableMetadata.provider.getOrElse("none")
+if (format != "avro" && format != "parquet" && format != "orc") {
+  throw new UnsupportedOperationException(s"Unsupported format: $format")
+}
+
+val location = tableMetadata.location.toString
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+
+val fastAppender = table.newFastAppend()
 
 Review comment:
   I don't think there is a reason to use fast appends.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org