[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798 @vinothchandar I've managed to reproduce with a simple spark.parallelize() example. test.scala ``` import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{month, year, col, dayofmonth} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame case class Bar(id: Int, name: String) // choose one of the following case class Foo(id: Int, bar: Bar) // with simple // case class Foo(bar: Bar) // withOUT simple case class Root(id: Int, foos: Array[Foo]) object HudiScalaStreamHelloWorld { def main(args: Array[String]): Unit = { val appName = "ScalaStreamExample" val batchInterval = Milliseconds(2000) val spark = SparkSession .builder() .appName(appName) .getOrCreate() val sparkContext = spark.sparkContext val streamingContext = new StreamingContext(sparkContext, batchInterval) import spark.implicits._ val sc = sparkContext // choose one of the following val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")).toDF() // with simple // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")).toDF() // withOUT simple dataFrame.printSchema() val hudiTableName = "order" val hudiTablePath = "s3://xxx/path/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } } ``` deploy.sh ``` sbt clean sbt package aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://./simple-project_2.11-1.0.jar aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,HudiScalaStreamHelloWorld,\ s3://.xxx/simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` build.sbt ``` name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4" libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating" scalacOptions := Seq("-unchecked", "-deprecation") ``` AWS glue job runs over the output s3 directory. From the presto EMR instance the result when simple object included on the array item: ``` presto:schema> select * from default; _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | foos -+--+++-++--- 20200214112552 | 20200214112552_0_1 | 1 | default |
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798 @vinothchandar I've managed to reproduce with a simple spark.parallelize() example. test.scala ``` import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{month, year, col, dayofmonth} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame case class Bar(id: Int, name: String) // Uncomment following section based on example // START - Simple object included in array item case class Foo(id: Int, bar: Bar) // foo with simple object // END - Simple object included in array item // START - Simple object not present in array item // missing the id: Int property // case class Foo(bar: Bar) // foo without simple object // END - Simple object not present in array item case class Root(id: Int, foos: Array[Foo]) object HudiScalaStreamHelloWorld { def main(args: Array[String]): Unit = { val appName = "ScalaStreamExample" val batchInterval = Milliseconds(2000) val spark = SparkSession .builder() .appName(appName) .getOrCreate() val sparkContext = spark.sparkContext val streamingContext = new StreamingContext(sparkContext, batchInterval) import spark.implicits._ val sc = sparkContext case class Bar(id: Int, name: String) // Uncomment following section based on example // START - Simple object included in array item // with simple item on foo in array // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")).toDF() // END - Simple object included in array item // START - Simple object not present in array item // without simple item on foo in array val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")).toDF() // END - Simple object not present in array item dataFrame.printSchema() val hudiTableName = "order" val hudiTablePath = "s3://xxx-/path/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } } ``` deploy.sh ``` sbt clean sbt package aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://./simple-project_2.11-1.0.jar aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,HudiScalaStreamHelloWorld,\ s3://.xxx/simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` build.sbt ``` name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4" libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating" scalacOptions := Seq("-unchecked", "-deprecation") ``` AWS glue job runs over the output s3 directory. From the presto EMR instance the result when simple
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798 @vinothchandar I've managed to reproduce with a simple spark.parallelize() example. test.scala ``` import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{month, year, col, dayofmonth} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame object HudiScalaStreamHelloWorld { def main(args: Array[String]): Unit = { val appName = "ScalaStreamExample" val batchInterval = Milliseconds(2000) val spark = SparkSession .builder() .appName(appName) .getOrCreate() val sparkContext = spark.sparkContext val streamingContext = new StreamingContext(sparkContext, batchInterval) import spark.implicits._ val sc = sparkContext case class Bar(id: Int, name: String) // Uncomment following section based on example // START - Simple object included in array item // case class Foo(id: Int, bar: Bar) // foo with simple object // END - Simple object included in array item // START - Simple object not present in array item // missing the id: Int property case class Foo(bar: Bar) // foo without simple object // END - Simple object not present in array item case class Root(id: Int, foos: Array[Foo]) // Uncomment following section based on example // START - Simple object included in array item // with simple item on foo in array // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")).toDF() // END - Simple object included in array item // START - Simple object not present in array item // without simple item on foo in array val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")).toDF() // END - Simple object not present in array item dataFrame.printSchema() val hudiTableName = "order" val hudiTablePath = "s3://xxx-/path/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } } ``` deploy.sh ``` sbt clean sbt package aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://./simple-project_2.11-1.0.jar aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,HudiScalaStreamHelloWorld,\ s3://.xxx/simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` build.sbt ``` name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4" libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating" scalacOptions := Seq("-unchecked", "-deprecation") ``` AWS glue job runs over the output s3 directory. From the
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129 Had some success, if i added a property to the object of simple type ie. not a nested object hudi saves the parquet and the presto query (select * and nested) appear to work. ``` const valueToUse = { ...order, tickets: { items: [ { dummyId: 2, // <-- added this property ticketType: { id: 258815, }, }, ], }, }; ``` Just to be sure, i saved the json to parquet using ``` dataFrame.withColumn("year", year(col("eventTimestamp"))) .withColumn("month", month(col("eventTimestamp"))) .withColumn("day", dayofmonth(col("eventTimestamp"))) .write .mode(SaveMode.Append) .partitionBy(typeOfEventColumnName, "year", "month", "day") .parquet(rawOutPath) ``` had AWS glue crawl over it to create a table. Then ran a query in presto against the table and `select * from table` worked fine, whereas on the hudi parquet it failed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129 Had some success, if i added a property to the object of simple type ie. not a nested object hudi saves the parquet and the presto query (select * and nested) appear to work. ``` const valueToUse = { ...order, tickets: { items: [ { dummyId: 2, // <-- added this property ticketType: { id: 258815, }, }, ], }, }; ``` Just to be sure, i saved the rdd to parquet using ``` dataFrame.withColumn("year", year(col("eventTimestamp"))) .withColumn("month", month(col("eventTimestamp"))) .withColumn("day", dayofmonth(col("eventTimestamp"))) .write .mode(SaveMode.Append) .partitionBy(typeOfEventColumnName, "year", "month", "day") .parquet(rawOutPath) ``` had AWS glue crawl over it to create a table. Then ran a query in presto against the table and `select * from table` worked fine, whereas on the hudi parquet it failed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585777129 Had some success, if i added a property to the object of simple type ie. not a nested object hudi saves the parquet and the presto query (select * and nested) appear to work. ``` const valueToUse = { ...order, tickets: { items: [ { dummyId: 2, // <-- added this property ticketType: { id: 258815, }, }, ], }, }; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585748045 Futher update I've narrowed it down to the array on the nested object. The following works when i place the following object on the kinesis stream ``` const valueToUse = { ...order, tickets: { items: [], }, }; ``` but when placed on the kinesis stream with an object in the array it fails ``` const valueToUse = { ...order, tickets: { items: [ { ticketType: { id: 258815, }, }, ], }, }; ``` error message and stack trace when running `select * from table` ``` Query 20200213_131029_00032_hej8h failed: No value present java.util.NoSuchElementException: No value present at java.util.Optional.get(Optional.java:135) at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225) at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283) at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274) at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261) at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254) at com.facebook.presto.operator.Driver.processInternal(Driver.java:379) at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283) at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675) at com.facebook.presto.operator.Driver.processFor(Driver.java:276) at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077) at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162) at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483) at com.facebook.presto.$gen.Presto_0_22720200211_134743_1.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585748045 Futher update I've narrowed it down to the array on the nested object. The following works when i place the following object on the kinesis stream ``` const valueToUse = { ...order, tickets: { items: [], }, }; ``` but when placed on the kinesis stream with an object in the array it fails ``` const valueToUse = { ...order, tickets: { items: [ { ticketType: { id: 258815, }, }, ], }, }; ``` error message and stack trace ``` Query 20200213_131029_00032_hej8h failed: No value present java.util.NoSuchElementException: No value present at java.util.Optional.get(Optional.java:135) at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readStruct(ParquetReader.java:193) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:276) at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225) at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283) at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274) at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261) at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254) at com.facebook.presto.operator.Driver.processInternal(Driver.java:379) at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283) at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675) at com.facebook.presto.operator.Driver.processFor(Driver.java:276) at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077) at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162) at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483) at com.facebook.presto.$gen.Presto_0_22720200211_134743_1.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502 i've managed to narrow down the issue to the data that is coming off the kinesis stream. when i replace the data from the stream with some test data as follows with the following code: ``` if (!rdd.isEmpty()){ val json = rdd.map(record=>new String(record)) val dataFrame = spark.read.json(json) dataFrame.printSchema(); dataFrame.show(); val hudiTableName = "order" val hudiTablePath = path + hudiTableName val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") // Write data into the Hudi dataset dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } ``` i replaced ``` val dataFrame = spark.read.json(json) ``` with ``` val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), Foo(2, Bar(2, "second".toDF() ``` and the `select * from table` worked as well as nested query `select id, bar.id, bar.name from table` So at this stage it's looking like there's an issue with the data and how it's coming off the kinesis stream Update: I've pasted the data using `dataFrame.show()` from the rdd off the stream here: ``` ++---++++-+ |clientId|eventId| eventTimestamp| id| order| typeOfEvent| ++---++++-+ | 369| 115423|2020-02-12T15:54:...|34551840|[External, [[Aust...|order-created| ++---++++-+ ``` Taking the data off the stream but ignoring the nested column 'order' and the `select * from table` query works. ``` val jsonFrame = spark.read.json(json) val dataFrame = jsonFrame.select("clientid", "eventId", "id", "typeOfEvent") ``` Other than being nested, is there something about the order column that would make this happen? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502 i've managed to narrow down the issue to the data that is coming off the kinesis stream. when i replace the data from the stream with some test data as follows with the following code: ``` if (!rdd.isEmpty()){ val json = rdd.map(record=>new String(record)) val dataFrame = spark.read.json(json) dataFrame.printSchema(); dataFrame.show(); val hudiTableName = "order" val hudiTablePath = path + hudiTableName val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") // Write data into the Hudi dataset dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } ``` i replaced ``` val dataFrame = spark.read.json(json) ``` with ``` val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), Foo(2, Bar(2, "second".toDF() ``` and the `select * from table` worked as well as nested query `select id, bar.id, bar.name from table` So at this stage it's looking like there's an issue with the data and how it's coming off the kinesis stream Update: I've pasted the schema from the rdd off the stream here: ``` ++---++++-+ |clientId|eventId| eventTimestamp| id| order| typeOfEvent| ++---++++-+ | 369| 115423|2020-02-12T15:54:...|34551840|[External, [[Aust...|order-created| ++---++++-+ ``` Taking the data off the stream but ignoring the nested column 'order' and the `select * from table` query works. ``` val jsonFrame = spark.read.json(json) val dataFrame = jsonFrame.select("clientid", "eventId", "id", "typeOfEvent") ``` Other than being nested, is there something about the order column that would make this happen? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502 i've managed to narrow down the issue to the data that is coming off the kinesis stream. when i replace the data from the stream with some test data as follows with the following code: ``` if (!rdd.isEmpty()){ val json = rdd.map(record=>new String(record)) val dataFrame = spark.read.json(json) dataFrame.printSchema(); dataFrame.show(); } val hudiTableName = "order" val hudiTablePath = path + hudiTableName val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") // Write data into the Hudi dataset dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) ``` i replaced ``` val dataFrame = spark.read.json(json) ``` with ``` val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), Foo(2, Bar(2, "second".toDF() ``` and the `select * from table` worked as well as nested query `select id, bar.id, bar.name from table` So at this stage it's looking like there's an issue with the data and how it's coming off the kinesis stream This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585338502 i've managed to narrow down the issue to the data that is coming off the kinesis stream. when i replace the data from the stream with some test data as follows with the following code: ``` if (!rdd.isEmpty()){ val json = rdd.map(record=>new String(record)) val dataFrame = spark.read.json(json) dataFrame.printSchema(); dataFrame.show(); val hudiTableName = "order" val hudiTablePath = path + hudiTableName val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") // Write data into the Hudi dataset dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } ``` i replaced ``` val dataFrame = spark.read.json(json) ``` with ``` val dataFrame = sparkContext.parallelize(Seq(Foo(1, Bar(1, "first")), Foo(2, Bar(2, "second".toDF() ``` and the `select * from table` worked as well as nested query `select id, bar.id, bar.name from table` So at this stage it's looking like there's an issue with the data and how it's coming off the kinesis stream This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-585128793 thanks @lamber-ken I've updated my deploy step to remove all references to org.apache.spark:spark-avro_2.11:2.4.4 ``` aws emr add-steps --cluster-id j-xx --steps Type=spark,Name=ScalaStream,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --jars,\'/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-streaming-kinesis-asl-assembly.jar\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,ScalaStream,\ s3://./simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` and i receive the following error, where no hudi file is created for a streaming record ``` 20/02/12 09:56:45 INFO BlockManagerInfo: Removed broadcast_20_piece0 on ip-10-10-10-212.ap-southeast-2.compute.internal:33453 in memory (size: 7.9 KB, free: 2.6 GB) Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/avro/SchemaConverters$ at org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:80) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:81) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at ScalaStream$.handleOrderCreated(stream.scala:39) at ScalaStream$$anonfun$main$1.apply(stream.scala:110) at ScalaStream$$anonfun$main$1.apply(stream.scala:82) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at