Re: Re: How to upsert on HDFS

2019-12-24 Thread nishith agarwal
Mayu1,

There shouldn't be a difference of upsert of 1 million vs 10 million. This
does seem to be a problem in the way the dataset is being queries. Could
you please give that link a read and if you still run into issues, we can
help you.

-Nishith

On Tue, Dec 24, 2019 at 11:36 PM ma...@bonc.com.cn 
wrote:

> Thank you for answering my question
> I found that this is not a problem with HDFS, but that the number of
> records can be upsert when 1 million, and it will have this problem when 10
> million
>
>
>
> ma...@bonc.com.cn
>
> From: nishith agarwal
> Date: 2019-12-25 15:19
> To: dev
> Subject: Re: How to upsert on HDFS
> Hello Mayu,
>
> To query Hudi tables, you need to register it against the correct
> HudiInputFormat. For more information on querying the hudi table, please
> read the following documentation :
> https://hudi.apache.org/querying_data.html.
> The duplicates might be happening due to the absence of the inputformat
> which will choose the right file/update to pick.
>
> Thanks,
> Nishith
>
> On Tue, Dec 24, 2019 at 6:46 PM ma...@bonc.com.cn 
> wrote:
>
> > Hello!
> > I want to update a record(upsert) of hudi. It works when I run on a local
> > dataset (such as file: /// e: / hudi_cow_table), but the original record
> > will not be overwritten when I run on HDFS (such as hdfs://
> > 172.16.44.28:8020/flink/hudi) and generated a new record. why?
> >
> > My program:import java.util.Collections
> >
> > import org.apache.spark.sql.SQLContext
> > import org.apache.spark.{SparkConf, SparkContext}
> >
> > object HudiUpdate {
> > def main(args: Array[String]): Unit = {
> > import org.apache.hudi.DataSourceWriteOptions._
> > import org.apache.hudi.QuickstartUtils._
> > import org.apache.hudi.config.HoodieWriteConfig._
> > import org.apache.spark.sql.SaveMode._
> >
> > import scala.collection.JavaConversions._
> >
> > //初始化
> > val conf = new SparkConf().setAppName("HudiTest")
> > .setMaster("local")
> > conf.set("spark.serializer",
> > "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
> > val sc = new SparkContext(conf)
> > val spark = new SQLContext(sc)
> >
> > val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
> > //val updateUUID = args(0)
> >
> > //设置表名、基本路径和数据生成器来为本指南生成记录。
> > val tableName = Constant.tableName
> > //val basePath = Constant.hdfsPath
> > val basePath = Constant.localPath
> >
> > query(spark, basePath, updateUUID)
> >
> > //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
> > val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\",
> > \"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
> > "\"begin_lat\": 0.4726905879569653, \"begin_lon\":
> > 0.46157858450465483, \"end_lat\": 0.754803407008858, " +
> > "\"end_lon\": 0.9671159942018241, \"fare\":
> > 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}"
> > println(record)
> > val upsert = Collections.singletonList(record);
> > //println("insert:"+System.currentTimeMillis())
> > val df = spark.read.json(spark.sparkContext.parallelize(upsert,
> 1))
> >
> > println("start:" + System.currentTimeMillis())
> > df.write.format("org.apache.hudi").
> > options(getQuickstartWriteConfigs).
> > option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> > option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> > option(TABLE_NAME, tableName).
> > mode(Append).
> > save(basePath);
> > println("end:" + System.currentTimeMillis())
> >
> > query(spark, basePath, updateUUID)
> >
> > println("finish")
> > }
> >
> > def query(spark: SQLContext, basePath: String, updateUUID: String) =
> {
> > val roViewDF = spark.
> > read.
> > format("org.apache.hudi").
> > load(basePath + "/*/*/*/*")
> > roViewDF.registerTempTable("hudi_ro_table")
> > val df0 = spark.sql("select * from  hudi_ro_table where uuid='" +
> > updateUUID + "'")
> > //df0.printSchema();
> > println(df0.rdd.collect().mkString(" "))
> > }
> > }
> >
> >
> >
> > ma...@bonc.com.cn
> >
>


Re: Re: How to upsert on HDFS

2019-12-24 Thread ma...@bonc.com.cn
Thank you for answering my question
I found that this is not a problem with HDFS, but that the number of records 
can be upsert when 1 million, and it will have this problem when 10 million



ma...@bonc.com.cn
 
From: nishith agarwal
Date: 2019-12-25 15:19
To: dev
Subject: Re: How to upsert on HDFS
Hello Mayu,
 
To query Hudi tables, you need to register it against the correct
HudiInputFormat. For more information on querying the hudi table, please
read the following documentation :
https://hudi.apache.org/querying_data.html.
The duplicates might be happening due to the absence of the inputformat
which will choose the right file/update to pick.
 
Thanks,
Nishith
 
On Tue, Dec 24, 2019 at 6:46 PM ma...@bonc.com.cn  wrote:
 
> Hello!
> I want to update a record(upsert) of hudi. It works when I run on a local
> dataset (such as file: /// e: / hudi_cow_table), but the original record
> will not be overwritten when I run on HDFS (such as hdfs://
> 172.16.44.28:8020/flink/hudi) and generated a new record. why?
>
> My program:import java.util.Collections
>
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.{SparkConf, SparkContext}
>
> object HudiUpdate {
> def main(args: Array[String]): Unit = {
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.QuickstartUtils._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.spark.sql.SaveMode._
>
> import scala.collection.JavaConversions._
>
> //初始化
> val conf = new SparkConf().setAppName("HudiTest")
> .setMaster("local")
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
> val sc = new SparkContext(conf)
> val spark = new SQLContext(sc)
>
> val updateUUID = "079bd411-ca1e-4092-9523-0b694f9f41f2"
> //val updateUUID = args(0)
>
> //设置表名、基本路径和数据生成器来为本指南生成记录。
> val tableName = Constant.tableName
> //val basePath = Constant.hdfsPath
> val basePath = Constant.localPath
>
> query(spark, basePath, updateUUID)
>
> //生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
> val record = "{\"ts\": 0.0, \"uuid\": \"" + updateUUID + "\",
> \"rider\": \"rider-213\", \"driver\": \"driver-213\", " +
> "\"begin_lat\": 0.4726905879569653, \"begin_lon\":
> 0.46157858450465483, \"end_lat\": 0.754803407008858, " +
> "\"end_lon\": 0.9671159942018241, \"fare\":
> 34.158284716382845, \"partitionpath\": \"americas/brazil/sao_paulo\"}"
> println(record)
> val upsert = Collections.singletonList(record);
> //println("insert:"+System.currentTimeMillis())
> val df = spark.read.json(spark.sparkContext.parallelize(upsert, 1))
>
> println("start:" + System.currentTimeMillis())
> df.write.format("org.apache.hudi").
> options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Append).
> save(basePath);
> println("end:" + System.currentTimeMillis())
>
> query(spark, basePath, updateUUID)
>
> println("finish")
> }
>
> def query(spark: SQLContext, basePath: String, updateUUID: String) = {
> val roViewDF = spark.
> read.
> format("org.apache.hudi").
> load(basePath + "/*/*/*/*")
> roViewDF.registerTempTable("hudi_ro_table")
> val df0 = spark.sql("select * from  hudi_ro_table where uuid='" +
> updateUUID + "'")
> //df0.printSchema();
> println(df0.rdd.collect().mkString(" "))
> }
> }
>
>
>
> ma...@bonc.com.cn
>