Re: Does Spark CSV accept a CSV String

2016-03-31 Thread Mich Talebzadeh
well my guess is just pkunzip it and use bzip2 to zip it or leave it as it
is.

Databricks handles *.bz2 type files. I know that.

Anyway that is the easy part :)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 31 March 2016 at 01:02, Benjamin Kim  wrote:

> Hi Mich,
>
> I forgot to mention that - this is the ugly part - the source data
> provider gives us (Windows) pkzip compressed files. Will spark uncompress
> these automatically? I haven’t been able to make it work.
>
> Thanks,
> Ben
>
> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh 
> wrote:
>
> Hi Ben,
>
> Well I have done it for standard csv files downloaded from spreadsheets to
> staging directory on hdfs and loaded from there.
>
> First you may not need to unzip them. dartabricks can read them (in my
> case) and zipped files.
>
> Check this. Mine is slightly different from what you have, First I zip my
> csv files with bzip2 and load them into hdfs
>
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""===  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
> directory ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
>
> Now you have all your csv files in the staging directory
>
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("
> hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
>
> // Need to create and populate target ORC table nw_10124772 in database
> accounts.in Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
> AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
> , AccountNumber
> FROM tmp
> """
> sql(sqltext)
>
> println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore
>
> Once you store into a some form of table (Parquet, ORC) etc you can do
> whatever you like with it.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 30 March 2016 at 22:13, Benjamin Kim  wrote:
>
>> Hi Mich,
>>
>> You are correct. I am talking about the Databricks package spark-csv you
>> have below.
>>
>> The files are stored in s3 and I download, unzip, and store each one of
>> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
>>
>> Here is some of the code.
>>
>> val filesRdd = sc.parallelize(lFiles, 250)
>> filesRdd.foreachPartition(files => {
>>   val s3Client = new AmazonS3Client(new
>> EnvironmentVariableCredentialsProvider())
>>   files.foreach(file => {
>>

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

I forgot to mention that - this is the ugly part - the source data provider 
gives us (Windows) pkzip compressed files. Will spark uncompress these 
automatically? I haven’t been able to make it work.

Thanks,
Ben

> On Mar 30, 2016, at 2:27 PM, Mich Talebzadeh  
> wrote:
> 
> Hi Ben,
> 
> Well I have done it for standard csv files downloaded from spreadsheets to 
> staging directory on hdfs and loaded from there.
> 
> First you may not need to unzip them. dartabricks can read them (in my case) 
> and zipped files.
> 
> Check this. Mine is slightly different from what you have, First I zip my csv 
> files with bzip2 and load them into hdfs
> 
> #!/bin/ksh
> DIR="/data/stg/accounts/nw/10124772"
> #
> ## Compress the files
> #
> echo `date` " ""===  Started compressing all csv FILEs"
> for FILE in `ls *.csv`
> do
>   /usr/bin/bzip2 ${FILE}
> done
> #
> ## Clear out hdfs staging directory
> #
> echo `date` " ""===  Started deleting old files from hdfs staging 
> directory ${DIR}"
> hdfs dfs -rm -r ${DIR}/*.bz2
> echo `date` " ""===  Started Putting bz2 fileS to hdfs staging directory 
> ${DIR}"
> for FILE in `ls *.bz2`
> do
>   hdfs dfs -copyFromLocal ${FILE} ${DIR}
> done
> echo `date` " ""===  Checking that all files are moved to hdfs staging 
> directory"
> hdfs dfs -ls ${DIR}
> exit 0
> 
> Now you have all your csv files in the staging directory
> 
> import org.apache.spark.sql.functions._
> import java.sql.{Date, Timestamp}
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') 
> ").collect.foreach(println)
> 
> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", 
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
> case class Accounts( TransactionDate: String, TransactionType: String, 
> Description: String, Value: Double, Balance: Double, AccountName: String, 
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p => 
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
> 
> // Need to create and populate target ORC table nw_10124772 in database 
> accounts.in  Hive
> //
> sql("use accounts")
> //
> // Drop and create table nw_10124772
> //
> sql("DROP TABLE IF EXISTS accounts.nw_10124772")
> var sqltext : String = ""
> sqltext = """
> CREATE TABLE accounts.nw_10124772 (
> TransactionDateDATE
> ,TransactionType   String
> ,Description   String
> ,Value Double
> ,Balance   Double
> ,AccountName   String
> ,AccountNumber Int
> )
> COMMENT 'from csv file from excel sheet'
> STORED AS ORC
> TBLPROPERTIES ( "orc.compress"="ZLIB" )
> """
> sql(sqltext)
> //
> // Put data in Hive table. Clean up is already done
> //
> sqltext = """
> INSERT INTO TABLE accounts.nw_10124772
> SELECT
>   
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
>  AS TransactionDate
> , TransactionType
> , Description
> , Value
> , Balance
> , AccountName
> , AccountNumber
> FROM tmp
> """
> sql(sqltext)
> 
> println ("\nFinished at"); sqlContext.sql("SELECT 
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore
> 
> Once you store into a some form of table (Parquet, ORC) etc you can do 
> whatever you like with it.
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 March 2016 at 22:13, Benjamin Kim  > wrote:
> Hi Mich,
> 
> You are correct. I am talking about the Databricks package spark-csv you have 
> below.
> 
> The files are stored in s3 and I download, unzip, and store each one of them 
> in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
> 
> Here is some of the code.
> 
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new 
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
> val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
> val zipFile = new ZipInputStream(s3Object.getObjectContent())
> val csvFile = readZipStream(zipFile)
>   })
> })
> 
> This function does the unzipping and converts to string.
> 
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new 

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Mich Talebzadeh
Hi Ben,

Well I have done it for standard csv files downloaded from spreadsheets to
staging directory on hdfs and loaded from there.

First you may not need to unzip them. dartabricks can read them (in my
case) and zipped files.

Check this. Mine is slightly different from what you have, First I zip my
csv files with bzip2 and load them into hdfs

#!/bin/ksh
DIR="/data/stg/accounts/nw/10124772"
#
## Compress the files
#
echo `date` " ""===  Started compressing all csv FILEs"
for FILE in `ls *.csv`
do
  /usr/bin/bzip2 ${FILE}
done
#
## Clear out hdfs staging directory
#
echo `date` " ""===  Started deleting old files from hdfs staging
directory ${DIR}"
hdfs dfs -rm -r ${DIR}/*.bz2
echo `date` " ""===  Started Putting bz2 fileS to hdfs staging
directory ${DIR}"
for FILE in `ls *.bz2`
do
  hdfs dfs -copyFromLocal ${FILE} ${DIR}
done
echo `date` " ""===  Checking that all files are moved to hdfs staging
directory"
hdfs dfs -ls ${DIR}
exit 0

Now you have all your csv files in the staging directory

import org.apache.spark.sql.functions._
import java.sql.{Date, Timestamp}
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)

val df =
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header",
"true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
case class Accounts( TransactionDate: String, TransactionType: String,
Description: String, Value: Double, Balance: Double, AccountName: String,
AccountNumber : String)
// Map the columns to names
//
val a = df.filter(col("Date") > "").map(p =>
Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
//
// Create a Spark temporary table
//
a.toDF.registerTempTable("tmp")

// Need to create and populate target ORC table nw_10124772 in database
accounts.in Hive
//
sql("use accounts")
//
// Drop and create table nw_10124772
//
sql("DROP TABLE IF EXISTS accounts.nw_10124772")
var sqltext : String = ""
sqltext = """
CREATE TABLE accounts.nw_10124772 (
TransactionDateDATE
,TransactionType   String
,Description   String
,Value Double
,Balance   Double
,AccountName   String
,AccountNumber Int
)
COMMENT 'from csv file from excel sheet'
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="ZLIB" )
"""
sql(sqltext)
//
// Put data in Hive table. Clean up is already done
//
sqltext = """
INSERT INTO TABLE accounts.nw_10124772
SELECT

TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(TransactionDate,'dd/MM/'),'-MM-dd'))
AS TransactionDate
, TransactionType
, Description
, Value
, Balance
, AccountName
, AccountNumber
FROM tmp
"""
sql(sqltext)

println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss') ").collect.fore

Once you store into a some form of table (Parquet, ORC) etc you can do
whatever you like with it.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 22:13, Benjamin Kim  wrote:

> Hi Mich,
>
> You are correct. I am talking about the Databricks package spark-csv you
> have below.
>
> The files are stored in s3 and I download, unzip, and store each one of
> them in a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).
>
> Here is some of the code.
>
> val filesRdd = sc.parallelize(lFiles, 250)
> filesRdd.foreachPartition(files => {
>   val s3Client = new AmazonS3Client(new
> EnvironmentVariableCredentialsProvider())
>   files.foreach(file => {
> val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
> val zipFile = new ZipInputStream(s3Object.getObjectContent())
> val csvFile = readZipStream(zipFile)
>   })
> })
>
> This function does the unzipping and converts to string.
>
> def readZipStream(stream: ZipInputStream): String = {
>   stream.getNextEntry
>   var stuff = new ListBuffer[String]()
>   val scanner = new Scanner(stream)
>   while(scanner.hasNextLine){
> stuff += scanner.nextLine
>   }
>   stuff.toList.mkString("\n")
> }
>
> The next step is to parse the CSV string and convert to a dataframe, which
> will populate a Hive/HBase table.
>
> If you can help, I would be truly grateful.
>
> Thanks,
> Ben
>
>
> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh 
> wrote:
>
> just to clarify are you talking about databricks csv package.
>
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
>
> Where are these zipped files? Are they copied to a staging directory in
> hdfs?
>
> HTH
>
> Dr Mich 

Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Benjamin Kim
Hi Mich,

You are correct. I am talking about the Databricks package spark-csv you have 
below.

The files are stored in s3 and I download, unzip, and store each one of them in 
a variable as a string using the AWS SDK (aws-java-sdk-1.10.60.jar).

Here is some of the code.

val filesRdd = sc.parallelize(lFiles, 250)
filesRdd.foreachPartition(files => {
  val s3Client = new AmazonS3Client(new 
EnvironmentVariableCredentialsProvider())
  files.foreach(file => {
val s3Object = s3Client.getObject(new GetObjectRequest(s3Bucket, file))
val zipFile = new ZipInputStream(s3Object.getObjectContent())
val csvFile = readZipStream(zipFile)
  })
})

This function does the unzipping and converts to string.

def readZipStream(stream: ZipInputStream): String = {
  stream.getNextEntry
  var stuff = new ListBuffer[String]()
  val scanner = new Scanner(stream)
  while(scanner.hasNextLine){
stuff += scanner.nextLine
  }
  stuff.toList.mkString("\n")
}

The next step is to parse the CSV string and convert to a dataframe, which will 
populate a Hive/HBase table.

If you can help, I would be truly grateful.

Thanks,
Ben


> On Mar 30, 2016, at 2:06 PM, Mich Talebzadeh  
> wrote:
> 
> just to clarify are you talking about databricks csv package.
> 
> $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0
> 
> Where are these zipped files? Are they copied to a staging directory in hdfs?
> 
> HTH
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> 
> On 30 March 2016 at 15:17, Benjamin Kim  > wrote:
> I have a quick question. I have downloaded multiple zipped files from S3 and 
> unzipped each one of them into strings. The next step is to parse using a CSV 
> parser. I want to know if there is a way to easily use the spark csv package 
> for this?
> 
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Does Spark CSV accept a CSV String

2016-03-30 Thread Mich Talebzadeh
just to clarify are you talking about databricks csv package.

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.3.0

Where are these zipped files? Are they copied to a staging directory in
hdfs?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 30 March 2016 at 15:17, Benjamin Kim  wrote:

> I have a quick question. I have downloaded multiple zipped files from S3
> and unzipped each one of them into strings. The next step is to parse using
> a CSV parser. I want to know if there is a way to easily use the spark csv
> package for this?
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>