Hi Vincent,
I downloads the CSV file and did the test.
Spark version 1.5.2
The full code as follows. Minor changes to delete yearAndCancelled.parquet
and output.csv files if they are already created
//$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
def TimeTaken[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
result
}
//
// Get a DF first based on Databricks CSV libraries
//
val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg2/2008.csv.bz2")
val df_1 = df.withColumnRenamed("Year","oldYear")
val df_2 =
df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
newType:String) = {
val df_1 = df.withColumnRenamed(name, "swap")
df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
}
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
// clean up the files in HDFS directory first if exist
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new
java.net.URI("hdfs://rhes564:9000"), hadoopConf)
val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
case _ : Throwable => { } }
val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
case _ : Throwable => { } }
// test write to parquet is fast
df_4.select("Year",
"Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
val selectedData = df_4.select("Year", "Cancelled")
val howLong =
TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
"true").save("output.csv"))
println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
").collect.foreach(println)
sys.exit()
My results are as follows
Started at
[20/03/2016 10:02:02.02]
Elapsed time: 63984582000ns
howLong: Unit = ()
Finished at
[20/03/2016 10:04:59.59]
So the whole job finished just under 3 minutes. The elapsed time for
saving output.csv
took 63 seconds. That CSV file has 7,009,728 rows
HTH
Dr Mich Talebzadeh
LinkedIn *
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
On 19 March 2016 at 22:36, Vincent Ohprecio <[email protected]> wrote:
> parquet works super fast but writes to csv took an hour. another tested
> with 1.5 and it was fast. im gonna try a few more setups to test. im
> testing 2.0
>
> thanks for your help
>
> Sent from my iDevice
>
> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh <[email protected]>
> wrote:
>
> Hi Vince,
>
> We had a similar case a while back. I tried two solutions in both Spark on
> Hive metastore and Hive on Spark engine.
>
> Hive version 2
> Spark as Hive engine 1.3.1
>
> Basically
>
> --1 Move .CSV data into HDFS:
> --2 Create an external table (all columns as string)
> --3 Create the ORC table (majority Int)
> --4 Insert the data from the external table to the Hive ORC table
> compressed as zlib
>
> ORC seems to be in this case a good candidate as a simple insert/select
> from external table to ORC takes no time. I bucketed ORC table and marked
> it as transactional in case one needs to make a correction to it (not
> really needed).
>
> The whole process was time stamped and it took 5 minutes to complete and
> there were 7,009,728 rows in total.
>
>
> +-------------------------+--+
> | starttime |
> +-------------------------+--+
> | 19/03/2016 22:21:19.19 |
> +-------------------------+--+
>
> +-------------------------+--+
> | endtime |
> +-------------------------+--+
> | 19/03/2016 22:26:12.12 |
> +-------------------------+--+
>
>
>
> This is the code. I will try spark code later with parquet
>
> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
> StartTime;
> set hive.exec.reducers.max=256;
> use test;
> --set hive.execution.engine=mr;
> --2)
> DROP TABLE IF EXISTS stg_t2;
> CREATE EXTERNAL TABLE stg_t2 (
> Year string
> , Month string
> , DayofMonth string
> , DayOfWeek string
> , DepTime string
> , CRSDepTime string
> , ArrTime string
> , CRSArrTime string
> , UniqueCarrier string
> , FlightNum string
> , TailNum string
> , ActualElapsedTime string
> , CRSElapsedTime string
> , AirTime string
> , ArrDelay string
> , DepDelay string
> , Origin string
> , Dest string
> , Distance string
> , TaxiIn string
> , TaxiOut string
> , Cancelled string
> , CancellationCode string
> , Diverted string
> , CarrierDelay string
> , WeatherDelay string
> , NASDelay string
> , SecurityDelay string
> , LateAircraftDelay string
> )
> COMMENT 'from csv file from
> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
> STORED AS TEXTFILE
> LOCATION 'hdfs://rhes564:9000/data/stg2'
> TBLPROPERTIES ("skip.header.line.count"="1")
> ;
> --3)
> DROP TABLE IF EXISTS t2008;
> CREATE TABLE t2008 (
> Year int
> , Month int
> , DayofMonth int
> , DayOfWeek int
> , DepTime string
> , CRSDepTime string
> , ArrTime string
> , CRSArrTime string
> , UniqueCarrier string
> , FlightNum int
> , TailNum int
> , ActualElapsedTime int
> , CRSElapsedTime int
> , AirTime int
> , ArrDelay int
> , DepDelay int
> , Origin string
> , Dest string
> , Distance int
> , TaxiIn int
> , TaxiOut int
> , Cancelled string
> , CancellationCode string
> , Diverted string
> , CarrierDelay int
> , WeatherDelay int
> , NASDelay int
> , SecurityDelay int
> , LateAircraftDelay int
> )
> COMMENT 'from csv file from
> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
> CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 BUCKETS
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB",
> "transactional"="true")
> ;
> --4) Put data in target table. do the conversion and ignore empty rows
> INSERT INTO TABLE t2008
> SELECT
> *
> FROM
> stg_t2
> ;
> --select count(1) from t2008
> ;
> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
> EndTime;
> !exit
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 19 March 2016 at 19:18, Vincent Ohprecio <[email protected]> wrote:
>
>>
>> For some reason writing data from Spark shell to csv using the `csv
>> package` takes almost an hour to dump to disk. Am I going crazy or did I do
>> this wrong? I tried writing to parquet first and its fast as normal.
>>
>> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's goes
>> crazy and it sounds like its taking off like a plane ... lol
>>
>> Here is the code if anyone wants to experiment:
>>
>> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>>
>> //
>>
>> // version 2.0.0-SNAPSHOT
>>
>> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.7.0_80)
>>
>> // http://stat-computing.org/dataexpo/2009/the-data.html
>>
>>
>> def time[R](block: => R): R = {
>>
>> val t0 = System.nanoTime()
>>
>> val result = block // call-by-name
>>
>> val t1 = System.nanoTime()
>>
>> println("Elapsed time: " + (t1 - t0) + "ns")
>>
>> result
>>
>> }
>>
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").load("/Users/employee/Downloads/2008.csv")
>>
>> val df_1 = df.withColumnRenamed("Year","oldYear")
>>
>> val df_2 =
>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>>
>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
>> newType:String) = {
>>
>> val df_1 = df.withColumnRenamed(name, "swap")
>>
>> df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>>
>> }
>>
>> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>>
>> val df_4 = convertColumn(df_2, "DepDelay", "int")
>>
>>
>> // test write to parquet is fast
>>
>> df_4.select("Year",
>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>>
>>
>> val selectedData = df_4.select("Year", "Cancelled")
>>
>>
>>
>> val howLong =
>> Time(selectedData.write.format("com.databricks.spark.csv").option("header",
>> "true").save("output.csv"))
>>
>>
>> //scala> val howLong =
>> time(selectedData.write.format("com.databricks.spark.csv").option("header",
>> "true").save("output.csv"))
>>
>> //Elapsed time: 3488272270000ns
>>
>> //howLong: Unit = ()
>>
>> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb
>>
>>
>>
>>
>>
>