Hi Vince, We had a similar case a while back. I tried two solutions in both Spark on Hive metastore and Hive on Spark engine.
Hive version 2 Spark as Hive engine 1.3.1 Basically --1 Move .CSV data into HDFS: --2 Create an external table (all columns as string) --3 Create the ORC table (majority Int) --4 Insert the data from the external table to the Hive ORC table compressed as zlib ORC seems to be in this case a good candidate as a simple insert/select from external table to ORC takes no time. I bucketed ORC table and marked it as transactional in case one needs to make a correction to it (not really needed). The whole process was time stamped and it took 5 minutes to complete and there were 7,009,728 rows in total. +-------------------------+--+ | starttime | +-------------------------+--+ | 19/03/2016 22:21:19.19 | +-------------------------+--+ +-------------------------+--+ | endtime | +-------------------------+--+ | 19/03/2016 22:26:12.12 | +-------------------------+--+ This is the code. I will try spark code later with parquet select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS StartTime; set hive.exec.reducers.max=256; use test; --set hive.execution.engine=mr; --2) DROP TABLE IF EXISTS stg_t2; CREATE EXTERNAL TABLE stg_t2 ( Year string , Month string , DayofMonth string , DayOfWeek string , DepTime string , CRSDepTime string , ArrTime string , CRSArrTime string , UniqueCarrier string , FlightNum string , TailNum string , ActualElapsedTime string , CRSElapsedTime string , AirTime string , ArrDelay string , DepDelay string , Origin string , Dest string , Distance string , TaxiIn string , TaxiOut string , Cancelled string , CancellationCode string , Diverted string , CarrierDelay string , WeatherDelay string , NASDelay string , SecurityDelay string , LateAircraftDelay string ) COMMENT 'from csv file from http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' STORED AS TEXTFILE LOCATION 'hdfs://rhes564:9000/data/stg2' TBLPROPERTIES ("skip.header.line.count"="1") ; --3) DROP TABLE IF EXISTS t2008; CREATE TABLE t2008 ( Year int , Month int , DayofMonth int , DayOfWeek int , DepTime string , CRSDepTime string , ArrTime string , CRSArrTime string , UniqueCarrier string , FlightNum int , TailNum int , ActualElapsedTime int , CRSElapsedTime int , AirTime int , ArrDelay int , DepDelay int , Origin string , Dest string , Distance int , TaxiIn int , TaxiOut int , Cancelled string , CancellationCode string , Diverted string , CarrierDelay int , WeatherDelay int , NASDelay int , SecurityDelay int , LateAircraftDelay int ) COMMENT 'from csv file from http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 BUCKETS STORED AS ORC TBLPROPERTIES ( "orc.compress"="ZLIB", "transactional"="true") ; --4) Put data in target table. do the conversion and ignore empty rows INSERT INTO TABLE t2008 SELECT * FROM stg_t2 ; --select count(1) from t2008 ; select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS EndTime; !exit HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 19 March 2016 at 19:18, Vincent Ohprecio <ohpre...@gmail.com> wrote: > > For some reason writing data from Spark shell to csv using the `csv > package` takes almost an hour to dump to disk. Am I going crazy or did I do > this wrong? I tried writing to parquet first and its fast as normal. > > On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes > crazy and it sounds like its taking off like a plane ... lol > > Here is the code if anyone wants to experiment: > > // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 > > // > > // version 2.0.0-SNAPSHOT > > // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java > 1.7.0_80) > > // http://stat-computing.org/dataexpo/2009/the-data.html > > > def time[R](block: => R): R = { > > val t0 = System.nanoTime() > > val result = block // call-by-name > > val t1 = System.nanoTime() > > println("Elapsed time: " + (t1 - t0) + "ns") > > result > > } > > > val df = > sqlContext.read.format("com.databricks.spark.csv").option("header", > "true").load("/Users/employee/Downloads/2008.csv") > > val df_1 = df.withColumnRenamed("Year","oldYear") > > val df_2 = > df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear") > > def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, > newType:String) = { > > val df_1 = df.withColumnRenamed(name, "swap") > > df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") > > } > > val df_3 = convertColumn(df_2, "ArrDelay", "int") > > val df_4 = convertColumn(df_2, "DepDelay", "int") > > > // test write to parquet is fast > > df_4.select("Year", > "Cancelled").write.format("parquet").save("yearAndCancelled.parquet") > > > val selectedData = df_4.select("Year", "Cancelled") > > > > val howLong = > Time(selectedData.write.format("com.databricks.spark.csv").option("header", > "true").save("output.csv")) > > > //scala> val howLong = > time(selectedData.write.format("com.databricks.spark.csv").option("header", > "true").save("output.csv")) > > //Elapsed time: 3488272270000ns > > //howLong: Unit = () > > https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb > > > > >