Hi I try tomorrow same settings as you to see if I can experience same issues. Will report back once done Thanks On 20 Mar 2016 3:50 pm, "Vincent Ohprecio" <[email protected]> wrote:
> Thanks Mich and Marco for your help. I have created a ticket to look into > it on dev channel. > Here is the issue https://issues.apache.org/jira/browse/SPARK-14031 > > On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh < > [email protected]> wrote: > >> Hi Vincent, >> >> I downloads the CSV file and did the test. >> >> Spark version 1.5.2 >> >> The full code as follows. Minor changes to delete >> yearAndCancelled.parquet and output.csv files if they are already created >> >> //$SPARK_HOME/bin/spark-shell --packages >> com.databricks:spark-csv_2.11:1.3.0 >> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >> println ("\nStarted at"); sqlContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> def TimeTaken[R](block: => R): R = { >> val t0 = System.nanoTime() >> val result = block // call-by-name >> val t1 = System.nanoTime() >> println("Elapsed time: " + (t1 - t0) + "ns") >> result >> } >> // >> // Get a DF first based on Databricks CSV libraries >> // >> val df = >> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", >> "true").option("header", "true").load("/data/stg2/2008.csv.bz2") >> val df_1 = df.withColumnRenamed("Year","oldYear") >> val df_2 = >> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear") >> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, >> newType:String) = { >> val df_1 = df.withColumnRenamed(name, "swap") >> df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") >> } >> val df_3 = convertColumn(df_2, "ArrDelay", "int") >> val df_4 = convertColumn(df_2, "DepDelay", "int") >> // clean up the files in HDFS directory first if exist >> val hadoopConf = new org.apache.hadoop.conf.Configuration() >> val hdfs = org.apache.hadoop.fs.FileSystem.get(new >> java.net.URI("hdfs://rhes564:9000"), hadoopConf) >> val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet" >> try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch { >> case _ : Throwable => { } } >> val output2 = "hdfs://rhes564:9000/user/hduser/output.csv" >> try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch { >> case _ : Throwable => { } } >> // test write to parquet is fast >> df_4.select("Year", >> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet") >> val selectedData = df_4.select("Year", "Cancelled") >> val howLong = >> TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header", >> "true").save("output.csv")) >> println ("\nFinished at"); sqlContext.sql("SELECT >> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') >> ").collect.foreach(println) >> sys.exit() >> >> >> My results are as follows >> >> Started at >> [20/03/2016 10:02:02.02] >> >> Elapsed time: 63984582000ns >> howLong: Unit = () >> Finished at >> [20/03/2016 10:04:59.59] >> >> So the whole job finished just under 3 minutes. The elapsed time for >> saving output.csv took 63 seconds. That CSV file has 7,009,728 rows >> >> HTH >> >> >> >> >> >> >> >> >> >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 19 March 2016 at 22:36, Vincent Ohprecio <[email protected]> wrote: >> >>> parquet works super fast but writes to csv took an hour. another tested >>> with 1.5 and it was fast. im gonna try a few more setups to test. im >>> testing 2.0 >>> >>> thanks for your help >>> >>> Sent from my iDevice >>> >>> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh <[email protected]> >>> wrote: >>> >>> Hi Vince, >>> >>> We had a similar case a while back. I tried two solutions in both Spark >>> on Hive metastore and Hive on Spark engine. >>> >>> Hive version 2 >>> Spark as Hive engine 1.3.1 >>> >>> Basically >>> >>> --1 Move .CSV data into HDFS: >>> --2 Create an external table (all columns as string) >>> --3 Create the ORC table (majority Int) >>> --4 Insert the data from the external table to the Hive ORC table >>> compressed as zlib >>> >>> ORC seems to be in this case a good candidate as a simple insert/select >>> from external table to ORC takes no time. I bucketed ORC table and marked >>> it as transactional in case one needs to make a correction to it (not >>> really needed). >>> >>> The whole process was time stamped and it took 5 minutes to complete and >>> there were 7,009,728 rows in total. >>> >>> >>> +-------------------------+--+ >>> | starttime | >>> +-------------------------+--+ >>> | 19/03/2016 22:21:19.19 | >>> +-------------------------+--+ >>> >>> +-------------------------+--+ >>> | endtime | >>> +-------------------------+--+ >>> | 19/03/2016 22:26:12.12 | >>> +-------------------------+--+ >>> >>> >>> >>> This is the code. I will try spark code later with parquet >>> >>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS >>> StartTime; >>> set hive.exec.reducers.max=256; >>> use test; >>> --set hive.execution.engine=mr; >>> --2) >>> DROP TABLE IF EXISTS stg_t2; >>> CREATE EXTERNAL TABLE stg_t2 ( >>> Year string >>> , Month string >>> , DayofMonth string >>> , DayOfWeek string >>> , DepTime string >>> , CRSDepTime string >>> , ArrTime string >>> , CRSArrTime string >>> , UniqueCarrier string >>> , FlightNum string >>> , TailNum string >>> , ActualElapsedTime string >>> , CRSElapsedTime string >>> , AirTime string >>> , ArrDelay string >>> , DepDelay string >>> , Origin string >>> , Dest string >>> , Distance string >>> , TaxiIn string >>> , TaxiOut string >>> , Cancelled string >>> , CancellationCode string >>> , Diverted string >>> , CarrierDelay string >>> , WeatherDelay string >>> , NASDelay string >>> , SecurityDelay string >>> , LateAircraftDelay string >>> ) >>> COMMENT 'from csv file from >>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' >>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' >>> STORED AS TEXTFILE >>> LOCATION 'hdfs://rhes564:9000/data/stg2' >>> TBLPROPERTIES ("skip.header.line.count"="1") >>> ; >>> --3) >>> DROP TABLE IF EXISTS t2008; >>> CREATE TABLE t2008 ( >>> Year int >>> , Month int >>> , DayofMonth int >>> , DayOfWeek int >>> , DepTime string >>> , CRSDepTime string >>> , ArrTime string >>> , CRSArrTime string >>> , UniqueCarrier string >>> , FlightNum int >>> , TailNum int >>> , ActualElapsedTime int >>> , CRSElapsedTime int >>> , AirTime int >>> , ArrDelay int >>> , DepDelay int >>> , Origin string >>> , Dest string >>> , Distance int >>> , TaxiIn int >>> , TaxiOut int >>> , Cancelled string >>> , CancellationCode string >>> , Diverted string >>> , CarrierDelay int >>> , WeatherDelay int >>> , NASDelay int >>> , SecurityDelay int >>> , LateAircraftDelay int >>> ) >>> COMMENT 'from csv file from >>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008' >>> CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256 >>> BUCKETS >>> STORED AS ORC >>> TBLPROPERTIES ( "orc.compress"="ZLIB", >>> "transactional"="true") >>> ; >>> --4) Put data in target table. do the conversion and ignore empty rows >>> INSERT INTO TABLE t2008 >>> SELECT >>> * >>> FROM >>> stg_t2 >>> ; >>> --select count(1) from t2008 >>> ; >>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS >>> EndTime; >>> !exit >>> >>> HTH >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> >>> On 19 March 2016 at 19:18, Vincent Ohprecio <[email protected]> wrote: >>> >>>> >>>> For some reason writing data from Spark shell to csv using the `csv >>>> package` takes almost an hour to dump to disk. Am I going crazy or did I do >>>> this wrong? I tried writing to parquet first and its fast as normal. >>>> >>>> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's >>>> goes crazy and it sounds like its taking off like a plane ... lol >>>> >>>> Here is the code if anyone wants to experiment: >>>> >>>> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 >>>> >>>> // >>>> >>>> // version 2.0.0-SNAPSHOT >>>> >>>> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java >>>> 1.7.0_80) >>>> >>>> // http://stat-computing.org/dataexpo/2009/the-data.html >>>> >>>> >>>> def time[R](block: => R): R = { >>>> >>>> val t0 = System.nanoTime() >>>> >>>> val result = block // call-by-name >>>> >>>> val t1 = System.nanoTime() >>>> >>>> println("Elapsed time: " + (t1 - t0) + "ns") >>>> >>>> result >>>> >>>> } >>>> >>>> >>>> val df = >>>> sqlContext.read.format("com.databricks.spark.csv").option("header", >>>> "true").load("/Users/employee/Downloads/2008.csv") >>>> >>>> val df_1 = df.withColumnRenamed("Year","oldYear") >>>> >>>> val df_2 = >>>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear") >>>> >>>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, >>>> newType:String) = { >>>> >>>> val df_1 = df.withColumnRenamed(name, "swap") >>>> >>>> df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") >>>> >>>> } >>>> >>>> val df_3 = convertColumn(df_2, "ArrDelay", "int") >>>> >>>> val df_4 = convertColumn(df_2, "DepDelay", "int") >>>> >>>> >>>> // test write to parquet is fast >>>> >>>> df_4.select("Year", >>>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet") >>>> >>>> >>>> val selectedData = df_4.select("Year", "Cancelled") >>>> >>>> >>>> >>>> val howLong = >>>> Time(selectedData.write.format("com.databricks.spark.csv").option("header", >>>> "true").save("output.csv")) >>>> >>>> >>>> //scala> val howLong = >>>> time(selectedData.write.format("com.databricks.spark.csv").option("header", >>>> "true").save("output.csv")) >>>> >>>> //Elapsed time: 3488272270000ns >>>> >>>> //howLong: Unit = () >>>> >>>> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb >>>> >>>> >>>> >>>> >>>> >>> >> >
