Hi Bijay.

At the moment it is only POC getting CSV data for invoices on a daily
basis, importing into HDFS and store it in ORC table (non transactional as
Spark cannot read from it) in Hive database.

I have written both Hive version and Spark version. The Hive version is
pretty stable as below. It is full of £ signs that Linux cannot translate
when file is imported from windows to staging area.

DROP TABLE IF EXISTS t2;
CREATE TABLE t2 (
 INVOICENUMBER          INT
,PAYMENTDATE            date
,NET                    DECIMAL(20,2)
,VAT                    DECIMAL(20,2)
,TOTAL                  DECIMAL(20,2)
)
COMMENT 'from csv file from excel sheet xxx'
CLUSTERED BY (INVOICENUMBER) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
--"transactional"="true")
;
--4) Put data in target table. do the conversion and ignore empty rows
INSERT INTO TABLE t2
SELECT
          INVOICENUMBER
        ,
TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/yyyy'),'yyyy-MM-dd'))
AS paymentdate
        , CAST(REGEXP_REPLACE(net,'[^\\d\\.]','') AS DECIMAL(20,2))
        , CAST(REGEXP_REPLACE(vat,'[^\\d\\.]','') AS DECIMAL(20,2))
        , CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2))
FROM
stg_t2
WHERE
--        INVOICENUMBER > 0 AND
        CAST(REGEXP_REPLACE(total,'[^\\d\\.]','') AS DECIMAL(20,2)) > 0.0
-- Exclude empty rows
;


This works OK

Now I wanted to write the same for Spark using functional programming and
no temp table

This is a sample program

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")
//define the schema using a case class
case class Invoices(Invoicenumber: String, Paymentdate: String, Net:
Double, VAT: Double, Total: Double)

val a = df.filter(col("Total") > "").map(p => Invoices(p(0).toString,
*p(1).toString*, p(2).toString.substring(1).replace(",", "").toDouble,
p(3).toString.substring(1).replace(",", "").toDouble,
p(4).toString.substring(1).replace(",", "").toDouble))
a.first
//
// convert this RDD to DF and create a Spark temporary table
//
a.toDF.registerTempTable("tmp")
//
// Need to create and populate target ORC table t14 in database test in Hive
//
sql("use test")
//
// Drop and create table t14
//
sql("DROP TABLE IF EXISTS t14")
var sqltext : String = ""
sqltext = """
CREATE TABLE t14 (
 INVOICENUMBER          INT
,PAYMENTDATE            DATE
,NET                    DECIMAL(20,2)
,VAT                    DECIMAL(20,2)
,TOTAL                  DECIMAL(20,2)
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = "INSERT INTO TABLE t14 SELECT * FROM tmp"
sql(sqltext)



However I need to do date manupelation on * p(1).toString *to  store the
date as I did in Hive?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 24 March 2016 at 17:34, Bijay Pathak <[email protected]> wrote:

> Hi,
>
> I have written the UDF for doing same in pyspark DataFrame since some of
> my dates are before unix standard time epoch of 1/1/1970. I have more than
> 250 columns and applying custom date_format UDF to more than 50 columns.
> I am getting OOM error and poor performance because of UDF.
>
> What's your Data Size and how is the performance?
>
> Thanks,
> Bijay
>
> On Thu, Mar 24, 2016 at 10:19 AM, Mich Talebzadeh <
> [email protected]> wrote:
>
>> Minor correction UK date is dd/MM/yyyy
>>
>> scala> sql("select paymentdate,
>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/yyyy'),'yyyy-MM-dd'))
>> AS newdate from tmp").first
>> res47: org.apache.spark.sql.Row = [10/02/2014,2014-02-10]
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 24 March 2016 at 17:09, Mich Talebzadeh <[email protected]>
>> wrote:
>>
>>> Thanks everyone. Appreciated
>>>
>>> sql("select paymentdate,
>>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'MM/dd/yyyy'),'yyyy-MM-dd'))
>>> from tmp").first
>>> res45: org.apache.spark.sql.Row = [10/02/2014,2014-10-02]
>>>
>>> Breaking a nut with sledgehammer :)
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 24 March 2016 at 17:03, Kasinathan, Prabhu <[email protected]>
>>> wrote:
>>>
>>>> Can you try this one?
>>>>
>>>> spark-sql> select paymentdate,
>>>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'MM/dd/yyyy'),'yyyy-MM-dd'))
>>>> from tmp;
>>>> 10/02/2014 2014-10-02
>>>> spark-sql>
>>>>
>>>>
>>>> From: Tamas Szuromi <[email protected]>
>>>> Date: Thursday, March 24, 2016 at 9:35 AM
>>>> To: Mich Talebzadeh <[email protected]>
>>>> Cc: Ajay Chander <[email protected]>, Tamas Szuromi <
>>>> [email protected]>, "user @spark" <[email protected]
>>>> >
>>>> Subject: Re: Converting a string of format of 'dd/MM/yyyy' in Spark sql
>>>>
>>>> Actually, you should run  sql("select paymentdate,
>>>> unix_timestamp(paymentdate, "dd/MM/yyyy") from tmp").first
>>>>
>>>>
>>>> But keep in mind you will get a unix timestamp!
>>>>
>>>>
>>>> On 24 March 2016 at 17:29, Mich Talebzadeh <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks guys.
>>>>>
>>>>> Unfortunately neither is working
>>>>>
>>>>>  sql("select paymentdate, unix_timestamp(paymentdate) from tmp").first
>>>>> res28: org.apache.spark.sql.Row = [10/02/2014,null]
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 24 March 2016 at 14:23, Ajay Chander <[email protected]> wrote:
>>>>>
>>>>>> Mich,
>>>>>>
>>>>>> Can you try the value for paymentdata to this
>>>>>> format  paymentdata='2015-01-01 23:59:59' , to_date(paymentdate) and
>>>>>> see if it helps.
>>>>>>
>>>>>>
>>>>>> On Thursday, March 24, 2016, Tamas Szuromi <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Mich,
>>>>>>>
>>>>>>> Take a look
>>>>>>> https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/functions.html#unix_timestamp(org.apache.spark.sql.Column,%20java.lang.String)
>>>>>>>
>>>>>>> cheers,
>>>>>>> Tamas
>>>>>>>
>>>>>>>
>>>>>>> On 24 March 2016 at 14:29, Mich Talebzadeh <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to convert a date in Spark temporary table
>>>>>>>>
>>>>>>>> Tried few approaches.
>>>>>>>>
>>>>>>>> scala> sql("select paymentdate, to_date(paymentdate) from tmp")
>>>>>>>> res21: org.apache.spark.sql.DataFrame = [paymentdate: string, _c1:
>>>>>>>> date]
>>>>>>>>
>>>>>>>>
>>>>>>>> scala> sql("select paymentdate, to_date(paymentdate) from
>>>>>>>> tmp").first
>>>>>>>> *res22: org.apache.spark.sql.Row = [10/02/2014,null]*
>>>>>>>>
>>>>>>>> My date is stored as String dd/MM/yyyy as shown above. However,
>>>>>>>> to_date() returns null!
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to