Hi
I try tomorrow same settings as you to see if I can experience same issues.
Will report back once done
Thanks
On 20 Mar 2016 3:50 pm, "Vincent Ohprecio" <[email protected]> wrote:

> Thanks Mich and Marco for your help. I have created a ticket to look into
> it on dev channel.
> Here is the issue https://issues.apache.org/jira/browse/SPARK-14031
>
> On Sun, Mar 20, 2016 at 2:57 AM, Mich Talebzadeh <
> [email protected]> wrote:
>
>> Hi Vincent,
>>
>> I downloads the CSV file and did the test.
>>
>> Spark version 1.5.2
>>
>> The full code as follows. Minor changes to delete
>> yearAndCancelled.parquet and output.csv files if they are already created
>>
>> //$SPARK_HOME/bin/spark-shell --packages
>> com.databricks:spark-csv_2.11:1.3.0
>> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> println ("\nStarted at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> def TimeTaken[R](block: => R): R = {
>>     val t0 = System.nanoTime()
>>     val result = block    // call-by-name
>>     val t1 = System.nanoTime()
>>     println("Elapsed time: " + (t1 - t0) + "ns")
>>     result
>> }
>> //
>> // Get a DF first based on Databricks CSV libraries
>> //
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg2/2008.csv.bz2")
>> val df_1 = df.withColumnRenamed("Year","oldYear")
>> val df_2 =
>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
>> newType:String) = {
>>   val df_1 = df.withColumnRenamed(name, "swap")
>>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>> }
>> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>> val df_4 = convertColumn(df_2, "DepDelay", "int")
>> // clean up the files in HDFS directory first if exist
>> val hadoopConf = new org.apache.hadoop.conf.Configuration()
>> val hdfs = org.apache.hadoop.fs.FileSystem.get(new
>> java.net.URI("hdfs://rhes564:9000"), hadoopConf)
>> val output1 = "hdfs://rhes564:9000/user/hduser/yearAndCancelled.parquet"
>> try { hdfs.delete(new org.apache.hadoop.fs.Path(output1), true) } catch {
>> case _ : Throwable => { } }
>> val output2 = "hdfs://rhes564:9000/user/hduser/output.csv"
>> try { hdfs.delete(new org.apache.hadoop.fs.Path(output2), true) } catch {
>> case _ : Throwable => { } }
>> // test write to parquet is fast
>> df_4.select("Year",
>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>> val selectedData = df_4.select("Year", "Cancelled")
>> val howLong =
>> TimeTaken(selectedData.write.format("com.databricks.spark.csv").option("header",
>> "true").save("output.csv"))
>> println ("\nFinished at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss')
>> ").collect.foreach(println)
>> sys.exit()
>>
>>
>> My results are as follows
>>
>> Started at
>> [20/03/2016 10:02:02.02]
>>
>> Elapsed time: 63984582000ns
>> howLong: Unit = ()
>> Finished at
>> [20/03/2016 10:04:59.59]
>>
>> So the whole job finished just under 3 minutes. The elapsed time for
>> saving output.csv took 63 seconds. That CSV file has 7,009,728 rows
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 19 March 2016 at 22:36, Vincent Ohprecio <[email protected]> wrote:
>>
>>> parquet works super fast but writes to csv took an hour.  another tested
>>> with 1.5 and it was fast.  im gonna try a few more setups to test. im
>>> testing 2.0
>>>
>>> thanks for your help
>>>
>>> Sent from my iDevice
>>>
>>> On Mar 19, 2016, at 3:30 PM, Mich Talebzadeh <[email protected]>
>>> wrote:
>>>
>>> Hi Vince,
>>>
>>> We had a similar case a while back. I tried two solutions in both Spark
>>> on Hive metastore and Hive on Spark engine.
>>>
>>> Hive version 2
>>> Spark as Hive engine 1.3.1
>>>
>>> Basically
>>>
>>> --1 Move .CSV data into HDFS:
>>> --2 Create an external table (all columns as string)
>>> --3 Create the ORC table (majority Int)
>>> --4 Insert the data from the external table to the Hive ORC table
>>> compressed as zlib
>>>
>>> ORC seems to be in this case a good candidate as a simple insert/select
>>> from external table to ORC takes no time. I bucketed ORC table and marked
>>> it as transactional in case one needs to make a correction to it (not
>>> really needed).
>>>
>>> The whole process was time stamped and it took 5 minutes to complete and
>>> there were 7,009,728 rows in total.
>>>
>>>
>>> +-------------------------+--+
>>> |        starttime        |
>>> +-------------------------+--+
>>> | 19/03/2016 22:21:19.19  |
>>> +-------------------------+--+
>>>
>>> +-------------------------+--+
>>> |         endtime         |
>>> +-------------------------+--+
>>> | 19/03/2016 22:26:12.12  |
>>> +-------------------------+--+
>>>
>>>
>>>
>>> This is the code. I will try spark code later with parquet
>>>
>>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
>>> StartTime;
>>> set hive.exec.reducers.max=256;
>>> use test;
>>> --set hive.execution.engine=mr;
>>> --2)
>>> DROP TABLE IF EXISTS stg_t2;
>>> CREATE EXTERNAL TABLE stg_t2 (
>>>    Year         string
>>> ,  Month        string
>>> ,  DayofMonth   string
>>> ,  DayOfWeek    string
>>> ,  DepTime      string
>>> ,  CRSDepTime   string
>>> ,  ArrTime      string
>>> ,  CRSArrTime   string
>>> ,  UniqueCarrier        string
>>> ,  FlightNum    string
>>> ,  TailNum      string
>>> ,  ActualElapsedTime    string
>>> ,  CRSElapsedTime       string
>>> ,  AirTime      string
>>> ,  ArrDelay     string
>>> ,  DepDelay     string
>>> ,  Origin       string
>>> ,  Dest         string
>>> ,  Distance     string
>>> ,  TaxiIn       string
>>> ,  TaxiOut      string
>>> ,  Cancelled    string
>>> ,  CancellationCode     string
>>> ,  Diverted     string
>>> ,  CarrierDelay         string
>>> ,  WeatherDelay         string
>>> ,  NASDelay     string
>>> ,  SecurityDelay        string
>>> ,  LateAircraftDelay    string
>>> )
>>> COMMENT 'from csv file from
>>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
>>> ROW FORMAT serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
>>> STORED AS TEXTFILE
>>> LOCATION 'hdfs://rhes564:9000/data/stg2'
>>> TBLPROPERTIES ("skip.header.line.count"="1")
>>> ;
>>> --3)
>>> DROP TABLE IF EXISTS t2008;
>>> CREATE TABLE t2008 (
>>>    Year         int
>>> ,  Month        int
>>> ,  DayofMonth   int
>>> ,  DayOfWeek    int
>>> ,  DepTime      string
>>> ,  CRSDepTime   string
>>> ,  ArrTime      string
>>> ,  CRSArrTime   string
>>> ,  UniqueCarrier        string
>>> ,  FlightNum    int
>>> ,  TailNum      int
>>> ,  ActualElapsedTime    int
>>> ,  CRSElapsedTime       int
>>> ,  AirTime      int
>>> ,  ArrDelay     int
>>> ,  DepDelay     int
>>> ,  Origin       string
>>> ,  Dest         string
>>> ,  Distance     int
>>> ,  TaxiIn       int
>>> ,  TaxiOut      int
>>> ,  Cancelled    string
>>> ,  CancellationCode     string
>>> ,  Diverted     string
>>> ,  CarrierDelay         int
>>> ,  WeatherDelay         int
>>> ,  NASDelay     int
>>> ,  SecurityDelay        int
>>> ,  LateAircraftDelay    int
>>> )
>>> COMMENT 'from csv file from
>>> http://stat-computing.org/dataexpo/2009/the-data.html, tear 2008'
>>> CLUSTERED BY (Year, Month, DayofMonth, DayOfWeek, DepTime) INTO 256
>>> BUCKETS
>>> STORED AS ORC
>>> TBLPROPERTIES ( "orc.compress"="ZLIB",
>>> "transactional"="true")
>>> ;
>>> --4) Put data in target table. do the conversion and ignore empty rows
>>> INSERT INTO TABLE t2008
>>> SELECT
>>>           *
>>> FROM
>>> stg_t2
>>> ;
>>> --select count(1) from t2008
>>> ;
>>> select from_unixtime(unix_timestamp(), 'dd/MM/yyyy HH:mm:ss.ss') AS
>>> EndTime;
>>> !exit
>>>
>>> HTH
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 19 March 2016 at 19:18, Vincent Ohprecio <[email protected]> wrote:
>>>
>>>>
>>>> For some reason writing data from Spark shell to csv using the `csv
>>>> package` takes almost an hour to dump to disk. Am I going crazy or did I do
>>>> this wrong? I tried writing to parquet first and its fast as normal.
>>>>
>>>> On my Macbook Pro 16g - 2.2 GHz Intel Core i7 -1TB the machine CPU's
>>>> goes crazy and it sounds like its taking off like a plane ... lol
>>>>
>>>> Here is the code if anyone wants to experiment:
>>>>
>>>> // ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0
>>>>
>>>> //
>>>>
>>>> // version 2.0.0-SNAPSHOT
>>>>
>>>> // Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java
>>>> 1.7.0_80)
>>>>
>>>> // http://stat-computing.org/dataexpo/2009/the-data.html
>>>>
>>>>
>>>> def time[R](block: => R): R = {
>>>>
>>>>     val t0 = System.nanoTime()
>>>>
>>>>     val result = block    // call-by-name
>>>>
>>>>     val t1 = System.nanoTime()
>>>>
>>>>     println("Elapsed time: " + (t1 - t0) + "ns")
>>>>
>>>>     result
>>>>
>>>> }
>>>>
>>>>
>>>> val df =
>>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>>> "true").load("/Users/employee/Downloads/2008.csv")
>>>>
>>>> val df_1 = df.withColumnRenamed("Year","oldYear")
>>>>
>>>> val df_2 =
>>>> df_1.withColumn("Year",df_1.col("oldYear").cast("int")).drop("oldYear")
>>>>
>>>> def convertColumn(df: org.apache.spark.sql.DataFrame, name:String,
>>>> newType:String) = {
>>>>
>>>>   val df_1 = df.withColumnRenamed(name, "swap")
>>>>
>>>>   df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap")
>>>>
>>>> }
>>>>
>>>> val df_3 = convertColumn(df_2, "ArrDelay", "int")
>>>>
>>>> val df_4 = convertColumn(df_2, "DepDelay", "int")
>>>>
>>>>
>>>> // test write to parquet is fast
>>>>
>>>> df_4.select("Year",
>>>> "Cancelled").write.format("parquet").save("yearAndCancelled.parquet")
>>>>
>>>>
>>>> val selectedData = df_4.select("Year", "Cancelled")
>>>>
>>>>
>>>>
>>>> val howLong =
>>>> Time(selectedData.write.format("com.databricks.spark.csv").option("header",
>>>> "true").save("output.csv"))
>>>>
>>>>
>>>> //scala> val howLong =
>>>> time(selectedData.write.format("com.databricks.spark.csv").option("header",
>>>> "true").save("output.csv"))
>>>>
>>>> //Elapsed time: 3488272270000ns
>>>>
>>>> //howLong: Unit = ()
>>>>
>>>> https://gist.github.com/bigsnarfdude/581b780ce85d7aaecbcb
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to