Thanks Mich and Marco for your help. I have created a ticket to look into it on dev channel. Here is the issue https://issues.apache.org/jira/browse/SPARK-14031
On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi Vincent, > > I downloads the CSV file and did the test. > > Spark version 1.5.2 > > The full code as follows. Minor changes to delete yearAndCancelled.parquet > and output.csv files if they are already created > > //$SPARK_HOME/bin/spark-shell --packages > com.databricks:spark-csv_2.11:1.3.0 > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) > println ("\nStarted at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > def TimeTaken[R](block: => R): R = { > val t0 = System.nanoTime() > val result = block // call-by-name > val t1 = System.nanoTime() > println("Elapsed time: " + (t1 - t0) + "ns") > result > } > // > // Get a DF first based on Databricks CSV libraries > // > val df = > sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", > "true").option("header", "true").load("/data/stg2/2008.csv.bz2") > val df_1 = df.withColumnRenamed("Year","oldYear") > val df_2 = > df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear") > def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, > newType:String) = { > val df_1 = df.withColumnRenamed(name, "swap") > df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") > } > val df_3 = convertColumn(df_2, "ArrDelay", "int") > val df_4 = convertColumn(df_2, "DepDelay", "int") > // clean up the files in HDFS directory first if exist > val hadoopConf = new org.apache.hadoop.conf.Configuration() > val hdfs = org.apache.hadoop.fs.FileSystem.get(new > java.net.URI("hdfs://rhes564:9000"), hadoopConf) > val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet" > try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch { > case _ : Throwable => { } } > val output2 = "hdfs://rhes564:9000/user/hduser/output.csv" > try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch { > case _ : Throwable => { } } > // test write to parquet is fast > df_4.select("Year", > "Cancelled").write.format("parquet").save("yearAndCancelled.parquet") > val selectedData = df_4.select("Year", "Cancelled") > val howLong = > TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header", > "true").save("output.csv")) > println ("\nFinished at"); sqlContext.sql("SELECT > FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') > ").collect.foreach(println) > sys.exit() > > > My results are as follows > > Started at > [20/03/2016 10:02:02.02] > > Elapsed time: 63984582000ns > howLong: Unit = () > Finished at > [20/03/2016 10:04:59.59] > > So the whole job finished just under 3 minutes. The elapsed time for > saving output.csv took 63 seconds. That CSV file has 7,009,728 rows > > HTH > > > > > > > > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 19 March 2016 at 22:36, Vincent Ohprecio <ohpre...@gmail.com> wrote: > >> parquet works super fast but writes to csv took an hour. another tested >> with 1.5 and it was fast. im gonna try a few more setups to test. im >> testing 2.0 >> >> thanks for your help >> >> Sent from my iDevice >> >> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >> Hi Vince, >> >> We had a similar case a while back. I tried two solutions in both Spark >> on Hive metastore and Hive on Spark engine. >> >> Hive version 2 >> Spark as Hive engine 1.3.1 >> >> Basically >> >> --1 Move .CSV data into HDFS: >> --2 Create an external table (all columns as string) >> --3 Create the ORC table (majority Int) >> --4 Insert the data from the external table to the Hive ORC table >> compressed as zlib >> >> ORC seems to be in this case a good candidate as a simple insert/select >> from external table to ORC takes no time. I bucketed ORC table and marked >> it as transactional in case one needs to make a correction to it (not >> really needed). >> >> The whole process was time stamped and it took 5 minutes to complete and >> there were 7,009,728 rows in total. >> >> >> +-------------------------+--+ >> | starttime | >> +-------------------------+--+ >> | 19/03/2016 22:21:19.19 | >> +-------------------------+--+ >> >> +-------------------------+--+ >> | endtime | >> +-------------------------+--+ >> | 19/03/2016 22:26:12.12 | >> +-------------------------+--+ >> >> >> >> This is the code. I will try spark code later with parquet >> >> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS >> StartTime; >> set hive.exec.reducers.max=256; >> use test; >> --set hive.execution.engine=mr; >> --2) >> DROP TABLE IF EXISTS stg_t2; >> CREATE EXTERNAL TABLE stg_t2 ( >> Year string >> , Month string >> , DayofMonth string >> , DayOfWeek string >> , DepTime string >> , CRSDepTime string >> , ArrTime string >> , CRSArrTime string >> , UniqueCarrier string >> , FlightNum string >> , TailNum string >> , ActualElapsedTime string >> , CRSElapsedTime string >> , AirTime string >> , ArrDelay string >> , DepDelay string >> , Origin string >> , Dest string >> , Distance string >> , TaxiIn string >> , TaxiOut string >> , Cancelled string >> , CancellationCode string >> , Diverted string >> , CarrierDelay string >> , WeatherDelay string >> , NASDelay string >> , SecurityDelay string >> , LateAircraftDelay string >> ) >> COMMENT 'from csv file from >> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' >> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' >> STORED AS TEXTFILE >> LOCATION 'hdfs://rhes564:9000/data/stg2' >> TBLPROPERTIES ("skip.header.line.count"="1") >> ; >> --3) >> DROP TABLE IF EXISTS t2008; >> CREATE TABLE t2008 ( >> Year int >> , Month int >> , DayofMonth int >> , DayOfWeek int >> , DepTime string >> , CRSDepTime string >> , ArrTime string >> , CRSArrTime string >> , UniqueCarrier string >> , FlightNum int >> , TailNum int >> , ActualElapsedTime int >> , CRSElapsedTime int >> , AirTime int >> , ArrDelay int >> , DepDelay int >> , Origin string >> , Dest string >> , Distance int >> , TaxiIn int >> , TaxiOut int >> , Cancelled string >> , CancellationCode string >> , Diverted string >> , CarrierDelay int >> , WeatherDelay int >> , NASDelay int >> , SecurityDelay int >> , LateAircraftDelay int >> ) >> COMMENT 'from csv file from >> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' >> CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 >> BUCKETS >> STORED AS ORC >> TBLPROPERTIES ( "orc.compress"="ZLIB", >> "transactional"="true") >> ; >> --4) Put data in target table. do the conversion and ignore empty rows >> INSERT INTO TABLE t2008 >> SELECT >> * >> FROM >> stg_t2 >> ; >> --select count(1) from t2008 >> ; >> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS >> EndTime; >> !exit >> >> HTH >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 19 March 2016 at 19:18, Vincent Ohprecio <ohpre...@gmail.com> wrote: >> >>> >>> For some reason writing data from Spark shell to csv using the `csv >>> package` takes almost an hour to dump to disk. Am I going crazy or did I do >>> this wrong? I tried writing to parquet first and its fast as normal. >>> >>> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's >>> goes crazy and it sounds like its taking off like a plane ... lol >>> >>> Here is the code if anyone wants to experiment: >>> >>> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 >>> >>> // >>> >>> // version 2.0.0-SNAPSHOT >>> >>> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java >>> 1.7.0_80) >>> >>> // http://stat-computing.org/dataexpo/2009/the-data.html >>> >>> >>> def time[R](block: => R): R = { >>> >>> val t0 = System.nanoTime() >>> >>> val result = block // call-by-name >>> >>> val t1 = System.nanoTime() >>> >>> println("Elapsed time: " + (t1 - t0) + "ns") >>> >>> result >>> >>> } >>> >>> >>> val df = >>> sqlContext.read.format("com.databricks.spark.csv").option("header", >>> "true").load("/Users/employee/Downloads/2008.csv") >>> >>> val df_1 = df.withColumnRenamed("Year","oldYear") >>> >>> val df_2 = >>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear") >>> >>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, >>> newType:String) = { >>> >>> val df_1 = df.withColumnRenamed(name, "swap") >>> >>> df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") >>> >>> } >>> >>> val df_3 = convertColumn(df_2, "ArrDelay", "int") >>> >>> val df_4 = convertColumn(df_2, "DepDelay", "int") >>> >>> >>> // test write to parquet is fast >>> >>> df_4.select("Year", >>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet") >>> >>> >>> val selectedData = df_4.select("Year", "Cancelled") >>> >>> >>> >>> val howLong = >>> Time(selectedData.write.format("com.databricks.spark.csv").option("header", >>> "true").save("output.csv")) >>> >>> >>> //scala> val howLong = >>> time(selectedData.write.format("com.databricks.spark.csv").option("header", >>> "true").save("output.csv")) >>> >>> //Elapsed time: 3488272270000ns >>> >>> //howLong: Unit = () >>> >>> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb >>> >>> >>> >>> >>> >> >