[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r320878326 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exce .transportFactory(new TTransportFactory()) .protocolFactory(new TBinaryProtocol.Factory()) .minWorkerThreads(3) -.maxWorkerThreads(5); +.maxWorkerThreads(10); Review comment: Can you revert this change? I'd like to see if it works with the recent changes to reduce the number of clients in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319589726 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +306,104 @@ object SparkTableUtil { ) } } + + private def buildManifest(conf: SerializableConfiguration, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec, + basePath: String): Iterator[Manifest] = { +if (sparkDataFiles.isEmpty) { + Seq.empty.iterator +} Review comment: Good catch. I assumed this method had not been changed much. This should be fixed by wrapping the write in an else case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319588722 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exce .transportFactory(new TTransportFactory()) .protocolFactory(new TBinaryProtocol.Factory()) .minWorkerThreads(3) -.maxWorkerThreads(5); +.maxWorkerThreads(10); Review comment: I think that using Spark's catalog will avoid some of these issues because tests will reuse the same connection. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318684701 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -131,18 +136,22 @@ object SparkTableUtil { s"$name=${partition(name)}" }.mkString("/") - DataFiles.builder(spec) - .withPath(path) - .withFormat(format) - .withPartitionPath(partitionKey) - .withFileSizeInBytes(fileSize) - .withMetrics(new Metrics(rowCount, -arrayToMap(columnSizes), -arrayToMap(valueCounts), -arrayToMap(nullValueCounts), -arrayToMap(lowerBounds), -arrayToMap(upperBounds))) - .build() + var builder = DataFiles.builder(spec) +.withPath(path) +.withFormat(format) +.withFileSizeInBytes(fileSize) +.withMetrics(new Metrics(rowCount, + arrayToMap(columnSizes), + arrayToMap(valueCounts), + arrayToMap(nullValueCounts), + arrayToMap(lowerBounds), + arrayToMap(upperBounds))) + + if (partitionKey == "") { +builder.build() + } else { +builder.withPartitionPath(partitionKey).build() Review comment: Yeah, that looks good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318680962 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +306,110 @@ object SparkTableUtil { ) } } + + private def buildManifest(conf: SerializableConfiguration, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec, + basePath: String): Iterator[Manifest] = { +if (sparkDataFiles.isEmpty) { + Seq.empty.iterator +} + +val io = new HadoopFileIO(conf.get()) +val ctx = TaskContext.get() +val location = new Path(basePath, + s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest") +val outputFile = io.newOutputFile(FileFormat.AVRO.addExtension(location.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +val manifestFile = writer.toManifestFile +Seq(Manifest(manifestFile.path, manifestFile.length, manifestFile.partitionSpecId)).iterator + } + + private case class Manifest(location: String, fileLength: Long, specId: Int) { +def toManifestFile: ManifestFile = new ManifestFile { + override def path: String = location + + override def length: Long = fileLength + + override def partitionSpecId: Int = specId + + override def snapshotId: java.lang.Long = null + + override def addedFilesCount: Integer = null + + override def existingFilesCount: Integer = null + + override def deletedFilesCount: Integer = null + + override def partitions: java.util.List[ManifestFile.PartitionFieldSummary] = null + + override def copy: ManifestFile = this +} + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * @param numOfManifest the expected number of manifest file to be created + * @param stagingDir the staging directory to store temporary manifest file + * + * @return table the imported table + */ + def importSparkTable( + source: TableIdentifier, + location: String, + numOfManifest: Int, + stagingDir: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val serializableConfiguration = new SerializableConfiguration(conf) +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val catalogTable = Hive.getTable(sparkSession, s"$dbName.$tableName") + val files = listPartition(Map.empty[String, String], catalogTable.location.toString, +catalogTable.storage.serde.getOrElse("none")) + files.foreach{f => appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))} +} else { + val partitions = partitionDF(sparkSession, s"$dbName.$tableName") + val manifests = partitions.flatMap { row => +listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) + }.repartition(numOfManifest).mapPartitions { Review comment: I think we need to change a couple of things: 1. Use `spark.sqlContext.conf.numShufflePartitions` instead of passing in a partition size to repartition. This parallelism setting is already something users can configure. 2. Add a sort by the file path to cluster data by partition into manifests. The converted table will have better performance. I think it should look like this: ```scala val manifests = partitions.flatMap { row => listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) }.repartition(spark.sqlContext.conf.numShufflePartitions) .orderBy($"path") .mapPartitions { ... }.collect().map(_.toManifestFile) ``` The repartition is still needed to ensure that the partition listing isn't
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318677245 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -131,18 +136,22 @@ object SparkTableUtil { s"$name=${partition(name)}" }.mkString("/") - DataFiles.builder(spec) - .withPath(path) - .withFormat(format) - .withPartitionPath(partitionKey) - .withFileSizeInBytes(fileSize) - .withMetrics(new Metrics(rowCount, -arrayToMap(columnSizes), -arrayToMap(valueCounts), -arrayToMap(nullValueCounts), -arrayToMap(lowerBounds), -arrayToMap(upperBounds))) - .build() + var builder = DataFiles.builder(spec) +.withPath(path) +.withFormat(format) +.withFileSizeInBytes(fileSize) +.withMetrics(new Metrics(rowCount, + arrayToMap(columnSizes), + arrayToMap(valueCounts), + arrayToMap(nullValueCounts), + arrayToMap(lowerBounds), + arrayToMap(upperBounds))) + + if (partitionKey == "") { +builder.build() + } else { +builder.withPartitionPath(partitionKey).build() Review comment: That's correct. Iceberg considers conversion from partition tuple to partition path a one-way conversion. We only want to use `withPartitionPath` for conversions like this one where we are importing data from Hive tables. In that case, it isn't clear what the date format will be because many people use strings for date partition fields. I think we could probably add support for ISO-8601 dates, but we want to avoid putting a lot of effort into parsing strings into partition values. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318669321 ## File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java ## @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() { Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, qualifiedTableName, "data = 'a'"); Assert.assertEquals("There should be 1 matching partition", 1, partitionDF.count()); } + + @Test + public void testImportPartitionedTable() throws Exception { Review comment: Actually, I think it is okay to commit this with support for Hadoop tables and add the Hive table support later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318254975 ## File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java ## @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() { Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, qualifiedTableName, "data = 'a'"); Assert.assertEquals("There should be 1 matching partition", 1, partitionDF.count()); } + + @Test + public void testImportPartitionedTable() throws Exception { Review comment: Can you run these tests to create Hive catalog tables as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318254975 ## File path: spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java ## @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() { Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, qualifiedTableName, "data = 'a'"); Assert.assertEquals("There should be 1 matching partition", 1, partitionDF.count()); } + + @Test + public void testImportPartitionedTable() throws Exception { Review comment: Can you run these tests with Hive tables as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318200212 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) Review comment: This can't be local because it is written on executors and read from the driver. If those are on different machines, the driver won't be able to find the location. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317712572 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) Review comment: Maybe you should have users pass in a location to use for this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317712345 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) Review comment: This location must be a shared location, either in HDFS or a distribute FS like S3. It can't be the local host. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711705 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val tableMetadata = sparkSession.sessionState.catalog.getTableMetadata(source) + val format = tableMetadata.provider.getOrElse("none") + + if (format != "avro" && format != "parquet" && format != "orc") { +throw new UnsupportedOperationException(s"Unsupported format: $format") + } + listPartition(Map.empty[String, String], tableMetadata.location.toString, +format).foreach{f => appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))} + appender.commit() +} else { + val partitions = partitionDF(sparkSession, s"$dbName.$tableName") + partitions.flatMap { row => +listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) + }.coalesce(1).mapPartitions { Review comment: And here's the `Manifest` case class: ```scala private[sql] case class Manifest(location: String, fileLength: Long, specId: Int) { def toManifestFile: ManifestFile = new ManifestFile { override def path: String = location override def length: Long = fileLength override def partitionSpecId: Int = specId override def snapshotId: java.lang.Long = null override def addedFilesCount: Integer = null override def existingFilesCount: Integer = null override def deletedFilesCount: Integer = null override def partitions: java.util.List[ManifestFile.PartitionFieldSummary] = null override def copy: ManifestFile = this } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711446 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val tableMetadata = sparkSession.sessionState.catalog.getTableMetadata(source) + val format = tableMetadata.provider.getOrElse("none") + + if (format != "avro" && format != "parquet" && format != "orc") { +throw new UnsupportedOperationException(s"Unsupported format: $format") + } + listPartition(Map.empty[String, String], tableMetadata.location.toString, +format).foreach{f => appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))} + appender.commit() +} else { + val partitions = partitionDF(sparkSession, s"$dbName.$tableName") + partitions.flatMap { row => +listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) + }.coalesce(1).mapPartitions { Review comment: Here's `writeManifest`: ```scala def writeManifest( conf: SerializableConfiguration, spec: PartitionSpec, basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files => if (files.hasNext) { val ctx = TaskContext.get() val manifestLocation = new Path(basePath, s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest.avro").toString val io = new HadoopFileIO(conf.value) val writer = ManifestWriter.write(spec, io.newOutputFile(manifestLocation)) try { files.foreach { file => writer.add(file.toDataFile(spec)) } } finally { writer.close() } val manifest = writer.toManifestFile Seq(Manifest(manifest.path, manifest.length, manifest.partitionSpecId)).iterator } else { Seq.empty.iterator } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317710889 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val tableMetadata = sparkSession.sessionState.catalog.getTableMetadata(source) + val format = tableMetadata.provider.getOrElse("none") + + if (format != "avro" && format != "parquet" && format != "orc") { +throw new UnsupportedOperationException(s"Unsupported format: $format") + } + listPartition(Map.empty[String, String], tableMetadata.location.toString, +format).foreach{f => appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))} + appender.commit() +} else { + val partitions = partitionDF(sparkSession, s"$dbName.$tableName") + partitions.flatMap { row => +listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) + }.coalesce(1).mapPartitions { +files => + val tables = new HadoopTables(new Configuration()) Review comment: Can you use `SerializableConfiguration` instead? This should use Spark's `hadoopConfiguration`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317711151 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val tableMetadata = sparkSession.sessionState.catalog.getTableMetadata(source) + val format = tableMetadata.provider.getOrElse("none") + + if (format != "avro" && format != "parquet" && format != "orc") { Review comment: This should use the same logic that `partitionDF` uses to detect the format. It should check the serde's input format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317710578 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +302,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, + sparkDataFiles: Seq[SparkDataFile], + partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + sparkDataFiles.foreach { file => +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { + val tableMetadata = sparkSession.sessionState.catalog.getTableMetadata(source) + val format = tableMetadata.provider.getOrElse("none") + + if (format != "avro" && format != "parquet" && format != "orc") { +throw new UnsupportedOperationException(s"Unsupported format: $format") + } + listPartition(Map.empty[String, String], tableMetadata.location.toString, +format).foreach{f => appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))} + appender.commit() +} else { + val partitions = partitionDF(sparkSession, s"$dbName.$tableName") + partitions.flatMap { row => +listPartition(row.getMap[String, String](0).toMap, row.getString(1), row.getString(2)) + }.coalesce(1).mapPartitions { Review comment: Why `coalesce(1)` here? In our version of this, we add a sort by file name and build manifests in parallel: ```scala val tempPath = new Path(s"hdfs:/tmp/iceberg-conversions/$applicationId") val manifests: Seq[ManifestFile] = files .repartition(100) // repartition to shuffle the data and not list partitions twice .orderBy($"path") .mapPartitions(writeManifest) // writes manifests to tempPath .collect() .map(_.toManifestFile) val append = table.newAppend manifests.foreach(append.appendManifest) append.commit() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317708705 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { Review comment: No, that's okay. If the table isn't partitioned, we don't need to get the partitions as a dataframe. It is fine to run this on the driver in that case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317377838 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + /** + * Import a spark table to a iceberg table. + * + * The import uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param source the database name of the table to be import + * @param location the location used to store table metadata + * + * @return table the imported table + */ + def importSparkTable(source: TableIdentifier, location: String): Table = { +val sparkSession = SparkSession.builder().getOrCreate() +import sparkSession.sqlContext.implicits._ + +val dbName = source.database.getOrElse("default") +val tableName = source.table + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") +val conf = sparkSession.sparkContext.hadoopConfiguration +val tables = new HadoopTables(conf) +val schema = SparkSchemaUtil.schemaForTable(sparkSession, s"$dbName.$tableName") +val table = tables.create(schema, partitionSpec, ImmutableMap.of(), location) +val appender = table.newAppend() + +if (partitionSpec == PartitionSpec.unpartitioned) { Review comment: Why is there a special case for unpartitioned tables? Doesn't `partitionDF` return a single partition in that case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373956 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { Review comment: Style: please use `foreach` instead of `for` loops in scala. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373945 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,81 @@ object SparkTableUtil { ) } } + + private def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], Review comment: Nit: indentation is off. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373916 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exce .transportFactory(new TTransportFactory()) .protocolFactory(new TBinaryProtocol.Factory()) .minWorkerThreads(3) -.maxWorkerThreads(5); +.maxWorkerThreads(8); Review comment: What have previous tests not released? What is the "default release time"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373882 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) { HiveConf newHiveConf = new HiveConf(new Configuration(), TestHiveMetastore.class); newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath()); +newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); Review comment: What warning? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317373878 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -19,18 +19,22 @@ package org.apache.iceberg.spark +import com.google.common.collect.ImmutableMap import com.google.common.collect.Maps import java.nio.ByteBuffer import java.util +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, PartitionSpec} -import org.apache.iceberg.hadoop.HadoopInputFile +import org.apache.iceberg._ Review comment: We never use wildcard imports. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824567 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -19,18 +19,22 @@ package org.apache.iceberg.spark +import com.google.common.collect.ImmutableMap import com.google.common.collect.Maps import java.nio.ByteBuffer import java.util +import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, PartitionSpec} -import org.apache.iceberg.hadoop.HadoopInputFile +import org.apache.iceberg._ Review comment: Please don't use wildcard imports. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824445 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) { HiveConf newHiveConf = new HiveConf(new Configuration(), TestHiveMetastore.class); newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath()); +newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); Review comment: What does this do? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315824375 ## File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java ## @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, HiveConf conf) throws Exce .transportFactory(new TTransportFactory()) .protocolFactory(new TBinaryProtocol.Factory()) .minWorkerThreads(3) -.maxWorkerThreads(5); +.maxWorkerThreads(8); Review comment: Why was this change required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399797 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,88 @@ object SparkTableUtil { ) } } + + def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + def partitionToMap(partition: String): Map[String, String] = { +val map = new mutable.HashMap[String, String]() +val list = partition.split("/") +list.foreach { str => + val kv = str.split("=") + map.put(kv(0), kv(1)) +} + +map.toMap + } + + /** + * Migrate a spark table to a iceberg table. + * + * The migration uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param dbName the database name of the table to be migrated + * @param tableName the table to be migrated + * @param table the target table to migrate in + * + * @return table the target table + */ + def migrateSparkTable(dbName: String, tableName: String, table: Table): Table = { +val sparkSession = SparkSession.builder().getOrCreate() + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val tableMetadata = sparkSession.sessionState.catalog. + getTableMetadata(new TableIdentifier(tableName, Some(dbName))) + +val format = tableMetadata.provider.getOrElse("none") +if (format != "avro" && format != "parquet" && format != "orc") { + throw new UnsupportedOperationException(s"Unsupported format: $format") +} + +val location = tableMetadata.location.toString +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") + +val fastAppender = table.newFastAppend() + +val partitions = sparkSession.sessionState.catalog.externalCatalog + .listPartitionNames(dbName, tableName) +if (partitions.isEmpty) { + val dataFiles = SparkTableUtil.listPartition(Map.empty[String, String], location, format) + fastAppender.appendManifest(buildManifest(table, dataFiles, PartitionSpec.unpartitioned)) +} else { + // Retrieve data files according to partition. result = [[datafiles], [datafiles]] + val dataFiles = partitions.map { e => +SparkTableUtil.listPartition(partitionToMap(e), location + "/" + e, format) + } + + // Append manifest for each partition + dataFiles.foreach { partition => +fastAppender.appendManifest(buildManifest(table, partition, partitionSpec)) + } +} + +fastAppender.apply() Review comment: There's no need to call apply. Just commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399700 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,88 @@ object SparkTableUtil { ) } } + + def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + def partitionToMap(partition: String): Map[String, String] = { +val map = new mutable.HashMap[String, String]() +val list = partition.split("/") +list.foreach { str => + val kv = str.split("=") + map.put(kv(0), kv(1)) +} + +map.toMap + } + + /** + * Migrate a spark table to a iceberg table. + * + * The migration uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param dbName the database name of the table to be migrated + * @param tableName the table to be migrated + * @param table the target table to migrate in + * + * @return table the target table + */ + def migrateSparkTable(dbName: String, tableName: String, table: Table): Table = { +val sparkSession = SparkSession.builder().getOrCreate() + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val tableMetadata = sparkSession.sessionState.catalog. + getTableMetadata(new TableIdentifier(tableName, Some(dbName))) + +val format = tableMetadata.provider.getOrElse("none") +if (format != "avro" && format != "parquet" && format != "orc") { + throw new UnsupportedOperationException(s"Unsupported format: $format") +} + +val location = tableMetadata.location.toString +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") + +val fastAppender = table.newFastAppend() + +val partitions = sparkSession.sessionState.catalog.externalCatalog + .listPartitionNames(dbName, tableName) Review comment: This should use `partitionDF` and call `listPartition` in parallel for large tables. See the example notebook: https://github.com/apache/incubator-iceberg/blob/master/examples/Convert%20table%20to%20Iceberg.ipynb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314399396 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,88 @@ object SparkTableUtil { ) } } + + def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + def partitionToMap(partition: String): Map[String, String] = { +val map = new mutable.HashMap[String, String]() +val list = partition.split("/") +list.foreach { str => + val kv = str.split("=") + map.put(kv(0), kv(1)) +} + +map.toMap + } + + /** + * Migrate a spark table to a iceberg table. + * + * The migration uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param dbName the database name of the table to be migrated + * @param tableName the table to be migrated + * @param table the target table to migrate in + * + * @return table the target table + */ + def migrateSparkTable(dbName: String, tableName: String, table: Table): Table = { +val sparkSession = SparkSession.builder().getOrCreate() + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val tableMetadata = sparkSession.sessionState.catalog. + getTableMetadata(new TableIdentifier(tableName, Some(dbName))) + +val format = tableMetadata.provider.getOrElse("none") Review comment: This should convert any Hive table, not just Spark DataSource tables. That's why the `listPartition` method supports a format for each partition. Use `partitionDF` to get a dataframe with each partition and format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org
[GitHub] [incubator-iceberg] rdblue commented on a change in pull request #374: Migrate spark table to iceberg table
rdblue commented on a change in pull request #374: Migrate spark table to iceberg table URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314397922 ## File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala ## @@ -297,5 +301,88 @@ object SparkTableUtil { ) } } + + def buildManifest(table: Table, +sparkDataFiles: Seq[SparkDataFile], +partitionSpec: PartitionSpec): ManifestFile = { +val outputFile = table.io + .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + UUID.randomUUID.toString)) +val writer = ManifestWriter.write(partitionSpec, outputFile) +try { + for (file <- sparkDataFiles) { +writer.add(file.toDataFile(partitionSpec)) + } +} finally { + writer.close() +} + +writer.toManifestFile + } + + def partitionToMap(partition: String): Map[String, String] = { +val map = new mutable.HashMap[String, String]() +val list = partition.split("/") +list.foreach { str => + val kv = str.split("=") + map.put(kv(0), kv(1)) +} + +map.toMap + } + + /** + * Migrate a spark table to a iceberg table. + * + * The migration uses the spark session to get table metadata. It assumes no + * operation is going on original table and target table and thus is not + * thread-safe. + * + * @param dbName the database name of the table to be migrated + * @param tableName the table to be migrated + * @param table the target table to migrate in + * + * @return table the target table + */ + def migrateSparkTable(dbName: String, tableName: String, table: Table): Table = { +val sparkSession = SparkSession.builder().getOrCreate() + +if (!sparkSession.catalog.tableExists(dbName, tableName)) { + throw new NoSuchTableException(s"Table $dbName.$tableName does not exist") +} + +val tableMetadata = sparkSession.sessionState.catalog. + getTableMetadata(new TableIdentifier(tableName, Some(dbName))) + +val format = tableMetadata.provider.getOrElse("none") +if (format != "avro" && format != "parquet" && format != "orc") { + throw new UnsupportedOperationException(s"Unsupported format: $format") +} + +val location = tableMetadata.location.toString +val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, s"$dbName.$tableName") + +val fastAppender = table.newFastAppend() Review comment: I don't think there is a reason to use fast appends. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org