[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-09-04 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r321016145
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-09-01 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319799292
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +305,104 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(conf: SerializableConfiguration,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec,
+  basePath: String): Iterator[Manifest] = {
+if (sparkDataFiles.isEmpty) {
+  return Seq.empty.iterator
 
 Review comment:
   I see... Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319699743
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   Make sense and the unit test passed when I change back to use Spark's 
catalog.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319348741
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   Agree to make it configurable. 
   
   The small threads number is OK for me as long as it is enough for new tests. 
 I tried to release the thread/connection when each unit test finish by setting 
`reqeustTimeoutVal` to a larger value and setting `stopTimeoutVal` to a smaller 
value, both of them not work as expected.  Is there any way to do that? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319444956
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +306,104 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(conf: SerializableConfiguration,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec,
+  basePath: String): Iterator[Manifest] = {
+if (sparkDataFiles.isEmpty) {
+  Seq.empty.iterator
+}
+
+val io = new HadoopFileIO(conf.get())
+val ctx = TaskContext.get()
+val location = new Path(basePath,
+  s"stage-${ctx.stageId()}-task-${ctx.taskAttemptId()}-manifest")
+val outputFile = 
io.newOutputFile(FileFormat.AVRO.addExtension(location.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
 
 Review comment:
   The expected behaviour is spark retry the task. If one of the tasks fail 
eventually, the job is failed and manifests are discarded from the driver as 
well.  As a result, the partial result will not be committed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319430647
 
 

 ##
 File path: spark/src/main/java/org/apache/iceberg/spark/hacks/Hive.java
 ##
 @@ -62,4 +63,16 @@ private Hive() {}
 
 return client.getPartitionsByFilter(client.getTable(db, table), exprs);
   }
+
+  public static CatalogTable getTable(SparkSession spark, String name) {
+List parts = 
Lists.newArrayList(Splitter.on('.').limit(2).split(name));
+String db = parts.size() == 1 ? "default" : parts.get(0);
+String table = parts.get(parts.size() == 1 ? 0 : 1);
+
+HiveClient client = HiveUtils$.MODULE$.newClientForMetadata(
 
 Review comment:
   OK, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-30 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319430539
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -131,18 +136,22 @@ object SparkTableUtil {
 s"$name=${partition(name)}"
   }.mkString("/")
 
-  DataFiles.builder(spec)
-  .withPath(path)
-  .withFormat(format)
-  .withPartitionPath(partitionKey)
-  .withFileSizeInBytes(fileSize)
-  .withMetrics(new Metrics(rowCount,
-arrayToMap(columnSizes),
-arrayToMap(valueCounts),
-arrayToMap(nullValueCounts),
-arrayToMap(lowerBounds),
-arrayToMap(upperBounds)))
-  .build()
+  var builder = DataFiles.builder(spec)
+.withPath(path)
+.withFormat(format)
+.withFileSizeInBytes(fileSize)
+.withMetrics(new Metrics(rowCount,
+  arrayToMap(columnSizes),
+  arrayToMap(valueCounts),
+  arrayToMap(nullValueCounts),
+  arrayToMap(lowerBounds),
+  arrayToMap(upperBounds)))
+
+  if (partitionKey == "") {
 
 Review comment:
   updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-29 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r319348741
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(10);
 
 Review comment:
   Agree to make it configurable. 
   
   The small threads number is OK for me as long as it is enough for new tests. 
So I'd like to know if we can release the thread/connection when each unit test 
finish?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-28 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318666900
 
 

 ##
 File path: 
spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
 ##
 @@ -116,4 +123,28 @@ public void testPartitionScanByFilter() {
 Dataset partitionDF = SparkTableUtil.partitionDFByFilter(spark, 
qualifiedTableName, "data = 'a'");
 Assert.assertEquals("There should be 1 matching partition", 1, 
partitionDF.count());
   }
+
+  @Test
+  public void testImportPartitionedTable() throws Exception {
 
 Review comment:
   Currently, it creates a hadoop table inside. I can change the signature to 
accept `Table` parameter so that we can import as the specific table.  you mean 
test import as a hadoop table and import as a hive table, right? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318368167
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
 
 Review comment:
   Understood, I was still thinking serialize the manifest file...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318176144
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
+throw new UnsupportedOperationException(s"Unsupported format: $format")
+  }
+  listPartition(Map.empty[String, String], tableMetadata.location.toString,
+format).foreach{f => 
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+  appender.commit()
+} else {
+  val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
+  partitions.flatMap { row =>
+listPartition(row.getMap[String, String](0).toMap, row.getString(1), 
row.getString(2))
+  }.coalesce(1).mapPartitions {
 
 Review comment:
   That's a temporary use. I left comments to discuss a way to serialize 
manifest info 
[here](https://github.com/apache/incubator-iceberg/pull/374#issuecomment-522904779).
 Looks like the case class is a good way to do that, I will copy this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318174610
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  sparkDataFiles.foreach { file =>
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
+  val tableMetadata = 
sparkSession.sessionState.catalog.getTableMetadata(source)
+  val format = tableMetadata.provider.getOrElse("none")
+
+  if (format != "avro" && format != "parquet" && format != "orc") {
 
 Review comment:
   ok, will update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-27 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r318171088
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +302,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+  sparkDataFiles: Seq[SparkDataFile],
+  partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
 
 Review comment:
   Agreed. Just want to confirm why it can't be the local file? It is just a 
temporary file, I wanted to store it in the tmpfs of localhost to save some IO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317379934
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
 
 Review comment:
   OK, will update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317379931
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,81 @@ object SparkTableUtil {
   )
 }
   }
+
+  private def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  /**
+   * Import a spark table to a iceberg table.
+   *
+   * The import uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param source the database name of the table to be import
+   * @param location the location used to store table metadata
+   *
+   * @return table the imported table
+   */
+  def importSparkTable(source: TableIdentifier, location: String): Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+import sparkSession.sqlContext.implicits._
+
+val dbName = source.database.getOrElse("default")
+val tableName = source.table
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+val conf = sparkSession.sparkContext.hadoopConfiguration
+val tables = new HadoopTables(conf)
+val schema = SparkSchemaUtil.schemaForTable(sparkSession, 
s"$dbName.$tableName")
+val table = tables.create(schema, partitionSpec, ImmutableMap.of(), 
location)
+val appender = table.newAppend()
+
+if (partitionSpec == PartitionSpec.unpartitioned) {
 
 Review comment:
   Yes, here is the exception:
   ```
   org.apache.hadoop.hive.ql.metadata.HiveException: Table item is not a 
partitioned table
 at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2123)
 at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2156)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:670)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getPartitions$1.apply(HiveClientImpl.scala:662)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:275)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:213)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:212)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:258)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:662)
 at 
org.apache.spark.sql.hive.client.HiveClient$class.getPartitions(HiveClient.scala:210)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.getPartitions(HiveClientImpl.scala:83)
 at org.apache.iceberg.spark.hacks.Hive.partitions(Hive.java:48)
 at 
org.apache.iceberg.spark.SparkTableUtil$.partitionDF(SparkTableUtil.scala:56)
   ```
   
   If that is not expected, I can try to fix that first


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317379712
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -19,18 +19,22 @@
 
 package org.apache.iceberg.spark
 
+import com.google.common.collect.ImmutableMap
 import com.google.common.collect.Maps
 import java.nio.ByteBuffer
 import java.util
+import java.util.UUID
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, 
PartitionSpec}
-import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg._
 
 Review comment:
   I see, will update in next commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317379704
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) {
 HiveConf newHiveConf = new HiveConf(new Configuration(), 
TestHiveMetastore.class);
 newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
 newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + 
hiveLocalDir.getAbsolutePath());
+newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, 
"false");
 
 Review comment:
   WARN org.apache.hadoop.hive.metastore.ObjectStore - Direct SQL failed, 
falling back to ORM
   java.lang.ClassCastException: org.apache.derby.impl.jdbc.EmbedClob cannot be 
cast to java.lang.String.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-24 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r317379695
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(8);
 
 Review comment:
   It is a new unit test I added, not related to the previous.
   
   The default release time is referred to 
`hive.server2.thrift.http.worker.keepalive.time` which is the `stopTimeoutVal` 
in TThreadPoolServer and is set default 60s ( I referred to 
`hive.server2.thrift.http.max.idle.time` by mistake..). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315939963
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -19,18 +19,22 @@
 
 package org.apache.iceberg.spark
 
+import com.google.common.collect.ImmutableMap
 import com.google.common.collect.Maps
 import java.nio.ByteBuffer
 import java.util
+import java.util.UUID
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, 
PartitionSpec}
-import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg._
 
 Review comment:
   It looks like we need to import 9 entities which exceed 100 line length 
limit. From spark's coding style, when importing more than 6 entities it 
prefers to use a wildcard. Just want to confirm which one do we prefer? line 
break or following spark coding style for scala?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315936341
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -19,18 +19,22 @@
 
 package org.apache.iceberg.spark
 
+import com.google.common.collect.ImmutableMap
 import com.google.common.collect.Maps
 import java.nio.ByteBuffer
 import java.util
+import java.util.UUID
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.iceberg.{DataFile, DataFiles, Metrics, MetricsConfig, 
PartitionSpec}
-import org.apache.iceberg.hadoop.HadoopInputFile
+import org.apache.iceberg._
 
 Review comment:
   hmm... The Idea IDE always optimizes the imports when adding a new one from 
Alt+Enter. Let me update it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315935961
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -111,6 +111,7 @@ private HiveConf newHiveConf(int port) {
 HiveConf newHiveConf = new HiveConf(new Configuration(), 
TestHiveMetastore.class);
 newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, 
"thrift://localhost:" + port);
 newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + 
hiveLocalDir.getAbsolutePath());
+newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, 
"false");
 
 Review comment:
   This is used to eliminate warning in the unit test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-20 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315935720
 
 

 ##
 File path: hive/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java
 ##
 @@ -102,7 +102,7 @@ private TServer newThriftServer(TServerSocket socket, 
HiveConf conf) throws Exce
 .transportFactory(new TTransportFactory())
 .protocolFactory(new TBinaryProtocol.Factory())
 .minWorkerThreads(3)
-.maxWorkerThreads(5);
+.maxWorkerThreads(8);
 
 Review comment:
   This is used for the unit test. The spark job needs at least three thrift 
server threads since we have three partitions in the unit test, the maximum 
value of 5 should be enough while the previous unit tests still not release yet 
(default release time is 1800s) which cause failed to connect to thrift server.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-19 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315162272
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
 
 Review comment:
   Aha, it does like the "import data" behaviour. When I tried to use the 
iceberg, the first thing I had to do is to "import table".  If we want to 
implement a migration include replace the pointer, more steps should be needed. 
I can have a look further.  At this stage, I 'd like to implement "import data" 
since that should also be needed at first in the migration IIUC.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-19 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r315150212
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
 
 Review comment:
   Make sense. I will update in next commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org



[GitHub] [incubator-iceberg] chenjunjiedada commented on a change in pull request #374: Migrate spark table to iceberg table

2019-08-16 Thread GitBox
chenjunjiedada commented on a change in pull request #374: Migrate spark table 
to iceberg table
URL: https://github.com/apache/incubator-iceberg/pull/374#discussion_r314771349
 
 

 ##
 File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
 ##
 @@ -297,5 +301,88 @@ object SparkTableUtil {
   )
 }
   }
+
+  def buildManifest(table: Table,
+sparkDataFiles: Seq[SparkDataFile],
+partitionSpec: PartitionSpec): ManifestFile = {
+val outputFile = table.io
+  .newOutputFile(FileFormat.AVRO.addExtension("/tmp/" + 
UUID.randomUUID.toString))
+val writer = ManifestWriter.write(partitionSpec, outputFile)
+try {
+  for (file <- sparkDataFiles) {
+writer.add(file.toDataFile(partitionSpec))
+  }
+} finally {
+  writer.close()
+}
+
+writer.toManifestFile
+  }
+
+  def partitionToMap(partition: String): Map[String, String] = {
+val map = new mutable.HashMap[String, String]()
+val list = partition.split("/")
+list.foreach { str =>
+  val kv = str.split("=")
+  map.put(kv(0), kv(1))
+}
+
+map.toMap
+  }
+
+  /**
+   * Migrate a spark table to a iceberg table.
+   *
+   * The migration uses the spark session to get table metadata. It assumes no
+   * operation is going on original table and target table and thus is not
+   * thread-safe.
+   *
+   * @param dbName the database name of the table to be migrated
+   * @param tableName the table to be migrated
+   * @param table the target table to migrate in
+   *
+   * @return table the target table
+   */
+  def migrateSparkTable(dbName: String, tableName: String, table: Table): 
Table = {
+val sparkSession = SparkSession.builder().getOrCreate()
+
+if (!sparkSession.catalog.tableExists(dbName, tableName)) {
+  throw new NoSuchTableException(s"Table $dbName.$tableName does not 
exist")
+}
+
+val tableMetadata = sparkSession.sessionState.catalog.
+  getTableMetadata(new TableIdentifier(tableName, Some(dbName)))
+
+val format = tableMetadata.provider.getOrElse("none")
+if (format != "avro" && format != "parquet" && format != "orc") {
+  throw new UnsupportedOperationException(s"Unsupported format: $format")
+}
+
+val location = tableMetadata.location.toString
+val partitionSpec = SparkSchemaUtil.specForTable(sparkSession, 
s"$dbName.$tableName")
+
+val fastAppender = table.newFastAppend()
+
+val partitions = sparkSession.sessionState.catalog.externalCatalog
+  .listPartitionNames(dbName, tableName)
 
 Review comment:
   I see.  One thing needs to confirm is do we need to use repartition to avoid 
creating too many manifests? Since in this case, we'd like to commit once after 
the spark job, so we don't need repartition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org