Thanks Mich and Marco for your help. I have created a ticket to look into
it on dev channel.
Here is the issue https://issues.apache.org/jira/browse/SPARK-14031

On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Vincent,
>
> I downloads the CSV file and did the test.
>
> Spark version 1.5.2
>
> The full code as follows. Minor changes to delete yearAndCancelled.parquet
> and output.csv files if they are already created
>
> //$SPARK_HOME/bin/spark-shell --packages
> com.databricks:spark-csv_2.11:1.3.0
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> def TimeTaken[R](block: => R): R = {
>     val t0 = System.nanoTime()
>     val result = block    // call-by-name
>     val t1 = System.nanoTime()
>     println("Elapsed time: " + (t1 - t0) + "ns")
>     result
> }
> //
> // Get a DF first based on Databricks CSV libraries
> //
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg2/2008.csv.bz2")
> val df_1 = df.withColumnRenamed("Year","oldYear")
> val df_2 =
> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
> newType:String) = {
>   val df_1 = df.withColumnRenamed(name, "swap")
>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
> }
> val df_3 = convertColumn(df_2, "ArrDelay", "int")
> val df_4 = convertColumn(df_2, "DepDelay", "int")
> // clean up the files in HDFS directory first if exist
> val hadoopConf = new org.apache.hadoop.conf.Configuration()
> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
> val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
> try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
> case _ : Throwable => { } }
> val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
> try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
> case _ : Throwable => { } }
> // test write to parquet is fast
> df_4.select("Year",
> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
> val selectedData = df_4.select("Year", "Cancelled")
> val howLong =
> TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
> "true").save("output.csv"))
> println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit()
>
>
> My results are as follows
>
> Started at
> [20/03/2016 10:02:02.02]
>
> Elapsed time: 63984582000ns
> howLong: Unit = ()
> Finished at
> [20/03/2016 10:04:59.59]
>
> So the whole job finished just under 3 minutes. The elapsed time for
> saving output.csv took 63 seconds. That CSV file has 7,009,728 rows
>
> HTH
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 19 March 2016 at 22:36, Vincent Ohprecio <ohpre...@gmail.com> wrote:
>
>> parquet works super fast but writes to csv took an hour.  another tested
>> with 1.5 and it was fast.  im gonna try a few more setups to test. im
>> testing 2.0
>>
>> thanks for your help
>>
>> Sent from my iDevice
>>
>> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>> Hi Vince,
>>
>> We had a similar case a while back. I tried two solutions in both Spark
>> on Hive metastore and Hive on Spark engine.
>>
>> Hive version 2
>> Spark as Hive engine 1.3.1
>>
>> Basically
>>
>> --1 Move .CSV data into HDFS:
>> --2 Create an external table (all columns as string)
>> --3 Create the ORC table (majority Int)
>> --4 Insert the data from the external table to the Hive ORC table
>> compressed as zlib
>>
>> ORC seems to be in this case a good candidate as a simple insert/select
>> from external table to ORC takes no time. I bucketed ORC table and marked
>> it as transactional in case one needs to make a correction to it (not
>> really needed).
>>
>> The whole process was time stamped and it took 5 minutes to complete and
>> there were 7,009,728 rows in total.
>>
>>
>> +-------------------------+--+
>> |        starttime        |
>> +-------------------------+--+
>> | 19/03/2016 22:21:19.19  |
>> +-------------------------+--+
>>
>> +-------------------------+--+
>> |         endtime         |
>> +-------------------------+--+
>> | 19/03/2016 22:26:12.12  |
>> +-------------------------+--+
>>
>>
>>
>> This is the code. I will try spark code later with parquet
>>
>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
>> StartTime;
>> set hive.exec.reducers.max=256;
>> use test;
>> --set hive.execution.engine=mr;
>> --2)
>> DROP TABLE IF EXISTS stg_t2;
>> CREATE EXTERNAL TABLE stg_t2 (
>>    Year         string
>> ,  Month        string
>> ,  DayofMonth   string
>> ,  DayOfWeek    string
>> ,  DepTime      string
>> ,  CRSDepTime   string
>> ,  ArrTime      string
>> ,  CRSArrTime   string
>> ,  UniqueCarrier        string
>> ,  FlightNum    string
>> ,  TailNum      string
>> ,  ActualElapsedTime    string
>> ,  CRSElapsedTime       string
>> ,  AirTime      string
>> ,  ArrDelay     string
>> ,  DepDelay     string
>> ,  Origin       string
>> ,  Dest         string
>> ,  Distance     string
>> ,  TaxiIn       string
>> ,  TaxiOut      string
>> ,  Cancelled    string
>> ,  CancellationCode     string
>> ,  Diverted     string
>> ,  CarrierDelay         string
>> ,  WeatherDelay         string
>> ,  NASDelay     string
>> ,  SecurityDelay        string
>> ,  LateAircraftDelay    string
>> )
>> COMMENT 'from csv file from
>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
>> STORED AS TEXTFILE
>> LOCATION 'hdfs://rhes564:9000/data/stg2'
>> TBLPROPERTIES ("skip.header.line.count"="1")
>> ;
>> --3)
>> DROP TABLE IF EXISTS t2008;
>> CREATE TABLE t2008 (
>>    Year         int
>> ,  Month        int
>> ,  DayofMonth   int
>> ,  DayOfWeek    int
>> ,  DepTime      string
>> ,  CRSDepTime   string
>> ,  ArrTime      string
>> ,  CRSArrTime   string
>> ,  UniqueCarrier        string
>> ,  FlightNum    int
>> ,  TailNum      int
>> ,  ActualElapsedTime    int
>> ,  CRSElapsedTime       int
>> ,  AirTime      int
>> ,  ArrDelay     int
>> ,  DepDelay     int
>> ,  Origin       string
>> ,  Dest         string
>> ,  Distance     int
>> ,  TaxiIn       int
>> ,  TaxiOut      int
>> ,  Cancelled    string
>> ,  CancellationCode     string
>> ,  Diverted     string
>> ,  CarrierDelay         int
>> ,  WeatherDelay         int
>> ,  NASDelay     int
>> ,  SecurityDelay        int
>> ,  LateAircraftDelay    int
>> )
>> COMMENT 'from csv file from
>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
>> CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256
>> BUCKETS
>> STORED AS ORC
>> TBLPROPERTIES ( "orc.compress"="ZLIB",
>> "transactional"="true")
>> ;
>> --4) Put data in target table. do the conversion and ignore empty rows
>> INSERT INTO TABLE t2008
>> SELECT
>>           *
>> FROM
>> stg_t2
>> ;
>> --select count(1) from t2008
>> ;
>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
>> EndTime;
>> !exit
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 19 March 2016 at 19:18, Vincent Ohprecio <ohpre...@gmail.com> wrote:
>>
>>>
>>> For some reason writing data from Spark shell to csv using the `csv
>>> package` takes almost an hour to dump to disk. Am I going crazy or did I do
>>> this wrong? I tried writing to parquet first and its fast as normal.
>>>
>>> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's
>>> goes crazy and it sounds like its taking off like a plane ... lol
>>>
>>> Here is the code if anyone wants to experiment:
>>>
>>> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>>>
>>> //
>>>
>>> // version 2.0.0-SNAPSHOT
>>>
>>> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
>>> 1.7.0_80)
>>>
>>> // http://stat-computing.org/dataexpo/2009/the-data.html
>>>
>>>
>>> def time[R](block: => R): R = {
>>>
>>>     val t0 = System.nanoTime()
>>>
>>>     val result = block    // call-by-name
>>>
>>>     val t1 = System.nanoTime()
>>>
>>>     println("Elapsed time: " + (t1 - t0) + "ns")
>>>
>>>     result
>>>
>>> }
>>>
>>>
>>> val df =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").load("/Users/employee/Downloads/2008.csv")
>>>
>>> val df_1 = df.withColumnRenamed("Year","oldYear")
>>>
>>> val df_2 =
>>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>>>
>>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
>>> newType:String) = {
>>>
>>>   val df_1 = df.withColumnRenamed(name, "swap")
>>>
>>>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>>>
>>> }
>>>
>>> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>>>
>>> val df_4 = convertColumn(df_2, "DepDelay", "int")
>>>
>>>
>>> // test write to parquet is fast
>>>
>>> df_4.select("Year",
>>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>>>
>>>
>>> val selectedData = df_4.select("Year", "Cancelled")
>>>
>>>
>>>
>>> val howLong =
>>> Time(selectedData.write.format("com.databricks.spark.csv").option("header",
>>> "true").save("output.csv"))
>>>
>>>
>>> //scala> val howLong =
>>> time(selectedData.write.format("com.databricks.spark.csv").option("header",
>>> "true").save("output.csv"))
>>>
>>> //Elapsed time: 3488272270000ns
>>>
>>> //howLong: Unit = ()
>>>
>>> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb
>>>
>>>
>>>
>>>
>>>
>>
>

Reply via email to