Re: Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

2022-02-25 Thread Gourav Sengupta
Hi,

can you please let us know the following:
1. the spark version
2. a few samples of input data
3. a few samples of what is the expected output that you want


Regards,
Gourav Sengupta

On Wed, Feb 23, 2022 at 8:43 PM karan alang  wrote:

> Hello All,
>
> I'm using StructuredStreaming, and am trying to use UDF to parse each row.
> Here is the requirement:
>
>- we can get alerts of a particular KPI with type 'major' OR 'critical'
>- for a KPI, if we get alerts of type 'major' eg _major, and we have a
>critical alert as well _critical, we need to ignore the _major alert, and
>consider _critical alert only
>
> There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)
>
> UDF Code (draft):
>
> @udf(returnType=StringType())def convertStructToStr(APP_CAUSE, tenantName, 
> window,,__major,__major, __critical, five__major, 
> __critical):
>
> res = "{window: "+ str(window) + "type: 10m, applianceName: "+ 
> str(APP_CAUSE)+","
> first = True
> for curr_alarm in AlarmKeys.alarms_all:
> alsplit = curr_alarm.split('__')
> if len(alsplit) == 2:
> # Only account for critical row if both major & critical are there
> if alsplit[1] == 'major':
> critical_alarm = alsplit[0] + "__critical"
> if int(col(critical_alarm)) > 0:
> continue
> if int(col(curr_alarm)) > 0:
> if first:
> mystring = "{} {}({})".format(mystring, 
> AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
> first = False
> else:
> mystring = "{}, {}({})".format(mystring, 
> AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
> res+="insight: "+mystring +"}"
>
> # structured streaming using udf, this is printing data on console# 
> eventually, i'll put data into Kafka instead
> df.select(convertStructToStr(*df.columns)) \
> .write \
> .format("console") \
> .option("numRows",100)\
> .option("checkpointLocation", 
> "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
> .option("outputMode", "complete")\
> .save("output")
>
> Additional Details in stackoverflow :
>
> https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t
>
>
> Question is -
>
> Can this be done using UDF ? Since I'm passing column values to the UDF, I
> have no way to check if a particular KPI of type 'critical' is available in
> the dataframe ?
>
> Any suggestions on the best way to solve this problem ?
> tia!
>
>


Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

2022-02-23 Thread karan alang
Hello All,

I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:

   - we can get alerts of a particular KPI with type 'major' OR 'critical'
   - for a KPI, if we get alerts of type 'major' eg _major, and we have a
   critical alert as well _critical, we need to ignore the _major alert, and
   consider _critical alert only

There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)

UDF Code (draft):

@udf(returnType=StringType())def convertStructToStr(APP_CAUSE,
tenantName, window,,__major,__major,
__critical, five__major, __critical):

res = "{window: "+ str(window) + "type: 10m, applianceName: "+
str(APP_CAUSE)+","
first = True
for curr_alarm in AlarmKeys.alarms_all:
alsplit = curr_alarm.split('__')
if len(alsplit) == 2:
# Only account for critical row if both major & critical are there
if alsplit[1] == 'major':
critical_alarm = alsplit[0] + "__critical"
if int(col(critical_alarm)) > 0:
continue
if int(col(curr_alarm)) > 0:
if first:
mystring = "{} {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
first = False
else:
mystring = "{}, {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
res+="insight: "+mystring +"}"

# structured streaming using udf, this is printing data on console#
eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation",
"/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
    .option("outputMode", "complete")\
.save("output")

Additional Details in stackoverflow :
https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t


Question is -

Can this be done using UDF ? Since I'm passing column values to the UDF, I
have no way to check if a particular KPI of type 'critical' is available in
the dataframe ?

Any suggestions on the best way to solve this problem ?
tia!


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
*I guess I was focusing on this:*

#2
I want to do the above as a event driven way, *without using the batches*
(i tried micro batches, but I realised that’s not what I want), i.e., *for
each arriving event or as soon as a event message come my stream, not by
accumulating the event *

If you want to update your graph without pulling the older data back
through the entire DAG, it seems like you need to store the graph data
somewhere. So that's why I jumped to accumulators - the state would be
around from event to event, and not require a "reaggregation" for each
event.

Arbitrary stateful streaming has this ability "built in" - that is, the
engine stores your intermediate results in RAM and then the next event
picks up where the last one left off.

I've just implemented the arbitrary stateful streaming option... Couldn't
figure out a better way of avoiding the re-shuffle, so ended up keeping the
prior state in the engine.

I'm not using GraphX, but it seems like the approach should work
irrespective - there's an interface called GroupState that you hand off an
iterator for from call to call.

Do keep in mind that you have to think about out of order event arrivals...

Send me a message to my direct email and I'll provide a link to the
source... Not sure I'm fully grokking your entire use case...


On Fri, Apr 5, 2019 at 1:15 PM Basavaraj  wrote:

> I have checked broadcast of accumulated values, but not satellite stateful
> stabbing
>
> But, I am not sure how that helps here
>
> On Fri, 5 Apr 2019, 10:13 pm Jason Nerothin, 
> wrote:
>
>> Have you looked at Arbitrary Stateful Streaming and Broadcast
>> Accumulators?
>>
>> On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:
>>
>>> Hi
>>>
>>> Have two questions
>>>
>>> #1
>>> I am trying to process events in realtime, outcome of the processing has
>>> to find a node in the GraphX and update that node as well (in case if any
>>> anomaly or state change), If a node is updated, I have to update the
>>> related nodes as well, want to know if GraphX can help in this by providing
>>> some native support
>>>
>>> #2
>>> I want to do the above as a event driven way, without using the batches
>>> (i tried micro batches, but I realised that’s not what I want), i.e., for
>>> each arriving event or as soon as a event message come my stream, not by
>>> accumulating the event
>>>
>>> I humbly welcome any pointers, constructive criticism
>>>
>>> Regards
>>> Basav
>>> - To
>>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
I have checked broadcast of accumulated values, but not satellite stateful
stabbing

But, I am not sure how that helps here

On Fri, 5 Apr 2019, 10:13 pm Jason Nerothin, 
wrote:

> Have you looked at Arbitrary Stateful Streaming and Broadcast Accumulators?
>
> On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:
>
>> Hi
>>
>> Have two questions
>>
>> #1
>> I am trying to process events in realtime, outcome of the processing has
>> to find a node in the GraphX and update that node as well (in case if any
>> anomaly or state change), If a node is updated, I have to update the
>> related nodes as well, want to know if GraphX can help in this by providing
>> some native support
>>
>> #2
>> I want to do the above as a event driven way, without using the batches
>> (i tried micro batches, but I realised that’s not what I want), i.e., for
>> each arriving event or as soon as a event message come my stream, not by
>> accumulating the event
>>
>> I humbly welcome any pointers, constructive criticism
>>
>> Regards
>> Basav
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Thanks,
> Jason
>


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
Have you looked at Arbitrary Stateful Streaming and Broadcast Accumulators?

On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:

> Hi
>
> Have two questions
>
> #1
> I am trying to process events in realtime, outcome of the processing has
> to find a node in the GraphX and update that node as well (in case if any
> anomaly or state change), If a node is updated, I have to update the
> related nodes as well, want to know if GraphX can help in this by providing
> some native support
>
> #2
> I want to do the above as a event driven way, without using the batches (i
> tried micro batches, but I realised that’s not what I want), i.e., for each
> arriving event or as soon as a event message come my stream, not by
> accumulating the event
>
> I humbly welcome any pointers, constructive criticism
>
> Regards
> Basav
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Thanks,
Jason


Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
HiHave two questions #1 I am trying to process events in realtime, outcome of the processing has to find a node in the GraphX and update that node as well (in case if any anomaly or state change), If a node is updated, I have to update the related nodes as well, want to know if GraphX can help in this by providing some native support#2 I want to do the above as a event driven way, without using the batches (i tried micro batches, but I realised that’s not what I want), i.e., for each arriving event or as soon as a event message come my stream, not by accumulating the event I humbly welcome any pointers, constructive criticism RegardsBasav
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Basavaraj
Hi

Have two questions 

#1 
I am trying to process events in realtime, outcome of the processing has to 
find a node in the GraphX and update that node as well (in case if any anomaly 
or state change), If a node is updated, I have to update the related nodes as 
well, want to know if GraphX can help in this by providing some native support

#2 
I want to do the above as a event driven way, without using the batches (i 
tried micro batches, but I realised that’s not what I want), i.e., for each 
arriving event or as soon as a event message come my stream, not by 
accumulating the event 

I humbly welcome any pointers, constructive criticism 

Regards
Basav

smime.p7s
Description: S/MIME cryptographic signature


RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Ok it turned out a bit less complicated than I thought :).  I would be 
interested if creating temporary table from DF and pushing data into Hive the 
best option here?

 

1.Prepare and clean up data with filter & map

2.Convert the RDD to DF

3.Create temporary table from DF

4.Use Hive database

5.Drop if exists and create ORC table in Hive database

6.Simple Insert/select from temporary table to Hive table

 

//

// Get a DF first based on Databricks CSV libraries

//

val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

//

// Next filter out empty rows (last colum has to be > "" and get rid of "?" 
special character. Also get rid of "," in money fields

// Example csv cell £,500.00 --> need to transform to plain 2500.00

//

val a = df.filter(col("Total") > "").map(x => (x.getString(0),x.getString(1), 
x.getString(2).substring(1).replace(",", "").toDouble, 
x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a.first

//

// convert this RDD to DF and create a Spark temporary table

//

a.toDF.registerTempTable("tmp")

//

// Need to create and populate target ORC table t3 in database test in Hive

//

sql("use test")

//

// Drop and create table t3

//

sql("DROP TABLE IF EXISTS t3")

var sqltext : String = ""

sqltext = """

CREATE TABLE t3 (

INVOICENUMBER  INT

,PAYMENTDATEtimestamp

,NETDECIMAL(20,2)

,VATDECIMAL(20,2)

,TOTAL  DECIMAL(20,2)

)

COMMENT 'from csv file from excel sheet'

STORED AS ORC

TBLPROPERTIES ( "orc.compress"="ZLIB" )

"""

sql(sqltext)

//

// Put data in Hive table. Clean up is already done

//

sqltext = "INSERT INTO t3 SELECT * FROM tmp"

sql(sqltext)

sql("SELECT * FROM t3 ORDER BY 1").collect.foreach(println)

sys.exit()

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> 

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: 20 February 2016 12:33
To: Mich Talebzadeh 
Cc: Michał Zieliński ; user @spark 

Subject: Re: Checking for null values when mapping

 

For #2, you can filter out row whose first column has length 0. 

 

Cheers


On Feb 20, 2016, at 6:59 AM, Mich Talebzadeh mailto:m...@peridale.co.uk> > wrote:

Thanks

 

 

So what I did was

 

scala> val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

 

 

scala> df.printSchema

root

|-- Invoice Number: string (nullable = true)

|-- Payment date: string (nullable = true)

|-- Net: string (nullable = true)

|-- VAT: string (nullable = true)

|-- Total: string (nullable = true)

 

 

So all the columns are Strings

 

Then I tried to exclude null rows by filtering on all columns not being null 
and map the rest

 

scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
col("Total").isNotNull).map(x => 
(x.getString(1),x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
MapPartitionsRDD[176] at map at :21

 

This still comes up with “String index out of range: “ error

 

16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

My questions are:

 

1.  Doing the map,  map(x =>

Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Ah. Ok.

> On Feb 20, 2016, at 2:31 PM, Mich Talebzadeh  wrote:
> 
> Yes I did that as well but no joy. My shell does it for windows files 
> automatically
>  
> Thanks, 
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Chandeep Singh [mailto:c...@chandeep.com] 
> Sent: 20 February 2016 14:27
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/ 
> <http://dos2unix.sourceforge.net/>)
>  
> Has helped me in the past to deal with special characters while using windows 
> based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))
>  
>> On Feb 20, 2016, at 2:17 PM, Chandeep Singh > <mailto:c...@chandeep.com>> wrote:
>>  
>> Understood. In that case Ted’s suggestion to check the length should solve 
>> the problem.
>>  
>>> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh >> <mailto:m...@peridale.co.uk>> wrote:
>>>  
>>> Hi,
>>>  
>>> That is a good question.
>>>  
>>> When data is exported from CSV to Linux, any character that cannot be 
>>> transformed is replaced by ?. That question mark is not actually the 
>>> expected “?” J
>>>  
>>> So the only way I can get rid of it is by drooping the first character 
>>> using substring(1). I checked I did the same in Hive sql
>>>  
>>> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>>>  
>>> HTH
>>>  
>>>  
>>> Dr Mich Talebzadeh
>>>  
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>  
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>>  
>>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>>  
>>> NOTE: The information in this email is proprietary and confidential. This 
>>> message is for the designated recipient only, if you are not the intended 
>>> recipient, you should destroy it immediately. Any information in this 
>>> message shall not be understood as given or endorsed by Peridale Technology 
>>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>>> the responsibility of the recipient to ensure that this email is virus 
>>> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>>> employees accept any responsibility.
>>>  
>>>  
>>> From: Chandeep Singh [mailto:c...@chandeep.com <mailto:c...@chandeep.com>] 
>>> Sent: 20 February 2016 13:47
>>> To: Mich Talebzadeh mailto:m...@peridale.co.uk>>
>>> Cc: user @spark mailto:user@spark.apache.org>>
>>> Subject: Re: Checking for null values when mapping
>>>  
>>> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
>>> replace for that as well? And then you wouldn’t run into issues with index 
>>> out of bound.
>>>  
>>> val a = "?1,187.50"  
>>> val b = ""
>>>  
>>> println(a.substring(1).replace(",", "”))
>>> —> 1187.50
>>>  
>>> println(a.replace("?", "").replace(",", "”))
>>> —> 1187.50
>>>  
>>> println(b.replace("?", "").replace(",", "”))
>>> —> No error / output since both ‘?' and ‘,' don’t exist.
>>>  
>>>  
>>>> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh >>> <mailto:m...@peridale.co.uk>> wrote:
>>>>  
>>>>  
>>&g

Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/ 
<http://dos2unix.sourceforge.net/>)

Has helped me in the past to deal with special characters while using windows 
based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))

> On Feb 20, 2016, at 2:17 PM, Chandeep Singh  wrote:
> 
> Understood. In that case Ted’s suggestion to check the length should solve 
> the problem.
> 
>> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh > <mailto:m...@peridale.co.uk>> wrote:
>> 
>> Hi,
>>  
>> That is a good question.
>>  
>> When data is exported from CSV to Linux, any character that cannot be 
>> transformed is replaced by ?. That question mark is not actually the 
>> expected “?” J
>>  
>> So the only way I can get rid of it is by drooping the first character using 
>> substring(1). I checked I did the same in Hive sql
>>  
>> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>>  
>> HTH
>>  
>>  
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>  
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Peridale Technology 
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>> the responsibility of the recipient to ensure that this email is virus free, 
>> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>> employees accept any responsibility.
>>  
>>  
>> From: Chandeep Singh [mailto:c...@chandeep.com <mailto:c...@chandeep.com>] 
>> Sent: 20 February 2016 13:47
>> To: Mich Talebzadeh mailto:m...@peridale.co.uk>>
>> Cc: user @spark mailto:user@spark.apache.org>>
>> Subject: Re: Checking for null values when mapping
>>  
>> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
>> replace for that as well? And then you wouldn’t run into issues with index 
>> out of bound.
>>  
>> val a = "?1,187.50"  
>> val b = ""
>>  
>> println(a.substring(1).replace(",", "”))
>> —> 1187.50
>>  
>> println(a.replace("?", "").replace(",", "”))
>> —> 1187.50
>>  
>> println(b.replace("?", "").replace(",", "”))
>> —> No error / output since both ‘?' and ‘,' don’t exist.
>>  
>>  
>>> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh >> <mailto:m...@peridale.co.uk>> wrote:
>>>  
>>>  
>>> I have a DF like below reading a csv file
>>>  
>>>  
>>> val df = 
>>> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
>>> "true").option("header", "true").load("/data/stg/table2")
>>>  
>>> val a = df.map(x => (x.getString(0), x.getString(1), 
>>> x.getString(2).substring(1).replace(",", 
>>> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
>>> x.getString(4).substring(1).replace(",", "").toDouble))
>>>  
>>>  
>>> For most rows I am reading from csv file the above mapping works fine. 
>>> However, at the bottom of csv there are couple of empty columns as below
>>>  
>>> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>>> []
>>> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>>> []
>>> [year 2014,,?113,500.00,?0.00,?113,500.00]
>>> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>>>  
>>> However, I get 
>>>  
>>> a.collect.foreach(println)
>>> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
>>> 161)
>>> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>>>  
>>> I suspect the cause is substring operation  say x.getString(2).substring(1) 
>>> on empty values that according to web will throw this type of error
>>>  
>>>  
>>> The easiest solutio

RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Yes I did that as well but no joy. My shell does it for windows files 
automatically

 

Thanks, 

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> 

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 14:27
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

Also, have you looked into Dos2Unix (http://dos2unix.sourceforge.net/)

 

Has helped me in the past to deal with special characters while using windows 
based CSV’s in Linux. (Might not be the solution here.. Just an FYI :))

 

On Feb 20, 2016, at 2:17 PM, Chandeep Singh mailto:c...@chandeep.com> > wrote:

 

Understood. In that case Ted’s suggestion to check the length should solve the 
problem.

 

On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh mailto:m...@peridale.co.uk> > wrote:

 

Hi,

 

That is a good question.

 

When data is exported from CSV to Linux, any character that cannot be 
transformed is replaced by ?. That question mark is not actually the expected 
“?” :)

 

So the only way I can get rid of it is by drooping the first character using 
substring(1). I checked I did the same in Hive sql

 

The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”

 

HTH

 

 

Dr Mich Talebzadeh

 

LinkedIn   
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

 <http://talebzadehmich.wordpress.com/> http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 13:47
To: Mich Talebzadeh mailto:m...@peridale.co.uk> >
Cc: user @spark mailto:user@spark.apache.org> >
Subject: Re: Checking for null values when mapping

 

Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

 

val a = "?1,187.50"  

val b = ""

 

println(a.substring(1).replace(",", "”))

—> 1187.50

 

println(a.replace("?", "").replace(",", "”))

—> 1187.50

 

println(b.replace("?", "").replace(",", "”))

—> No error / output since both ‘?' and ‘,' don’t exist.

 

 

On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh < <mailto:m...@peridale.co.uk> 
m...@peridale.co.uk> wrote:

 

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will thr

Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Understood. In that case Ted’s suggestion to check the length should solve the 
problem.

> On Feb 20, 2016, at 2:09 PM, Mich Talebzadeh  wrote:
> 
> Hi,
>  
> That is a good question.
>  
> When data is exported from CSV to Linux, any character that cannot be 
> transformed is replaced by ?. That question mark is not actually the expected 
> “?” J
>  
> So the only way I can get rid of it is by drooping the first character using 
> substring(1). I checked I did the same in Hive sql
>  
> The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”
>  
> HTH
>  
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Chandeep Singh [mailto:c...@chandeep.com] 
> Sent: 20 February 2016 13:47
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> Looks like you’re using substring just to get rid of the ‘?’. Why not use 
> replace for that as well? And then you wouldn’t run into issues with index 
> out of bound.
>  
> val a = "?1,187.50"  
> val b = ""
>  
> println(a.substring(1).replace(",", "”))
> —> 1187.50
>  
> println(a.replace("?", "").replace(",", "”))
> —> 1187.50
>  
> println(b.replace("?", "").replace(",", "”))
> —> No error / output since both ‘?' and ‘,' don’t exist.
>  
>  
>> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh > <mailto:m...@peridale.co.uk>> wrote:
>>  
>>  
>> I have a DF like below reading a csv file
>>  
>>  
>> val df = 
>> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
>> "true").option("header", "true").load("/data/stg/table2")
>>  
>> val a = df.map(x => (x.getString(0), x.getString(1), 
>> x.getString(2).substring(1).replace(",", 
>> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
>> x.getString(4).substring(1).replace(",", "").toDouble))
>>  
>>  
>> For most rows I am reading from csv file the above mapping works fine. 
>> However, at the bottom of csv there are couple of empty columns as below
>>  
>> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>> []
>> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>> []
>> [year 2014,,?113,500.00,?0.00,?113,500.00]
>> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>>  
>> However, I get 
>>  
>> a.collect.foreach(println)
>> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
>> 161)
>> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>>  
>> I suspect the cause is substring operation  say x.getString(2).substring(1) 
>> on empty values that according to web will throw this type of error
>>  
>>  
>> The easiest solution seems to be to check whether x above is not null and do 
>> the substring operation. Can this be done without using a UDF?
>>  
>> Thanks
>>  
>> Dr Mich Talebzadeh
>>  
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>  
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>
>>  
>> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>>  
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Peridale Technology 
>> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
>> the responsibility of the recipient to ensure that this email is virus free, 
>> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
>> employees accept any responsibility.



RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Hi,

 

That is a good question.

 

When data is exported from CSV to Linux, any character that cannot be 
transformed is replaced by ?. That question mark is not actually the expected 
“?” :) 

 

So the only way I can get rid of it is by drooping the first character using 
substring(1). I checked I did the same in Hive sql

 

The actual field in CSV is “£2,500.oo” that translates into “?2,500.00”

 

HTH

 

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> 

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Chandeep Singh [mailto:c...@chandeep.com] 
Sent: 20 February 2016 13:47
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

 

val a = "?1,187.50"  

val b = ""

 

println(a.substring(1).replace(",", "”))

—> 1187.50

 

println(a.replace("?", "").replace(",", "”))

—> 1187.50

 

println(b.replace("?", "").replace(",", "”))

—> No error / output since both ‘?' and ‘,' don’t exist.

 

 

On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh mailto:m...@peridale.co.uk> > wrote:

 

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do 
the substring operation. Can this be done without using a UDF?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> 

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 



Re: Checking for null values when mapping

2016-02-20 Thread Chandeep Singh
Looks like you’re using substring just to get rid of the ‘?’. Why not use 
replace for that as well? And then you wouldn’t run into issues with index out 
of bound.

val a = "?1,187.50"  
val b = ""

println(a.substring(1).replace(",", "”))
—> 1187.50

println(a.replace("?", "").replace(",", "”))
—> 1187.50

println(b.replace("?", "").replace(",", "”))
—> No error / output since both ‘?' and ‘,' don’t exist.


> On Feb 20, 2016, at 8:24 AM, Mich Talebzadeh  wrote:
> 
>  
> I have a DF like below reading a csv file
>  
>  
> val df = 
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
>  
> val a = df.map(x => (x.getString(0), x.getString(1), 
> x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
>  
> For most rows I am reading from csv file the above mapping works fine. 
> However, at the bottom of csv there are couple of empty columns as below
>  
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
> []
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> []
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> However, I get 
>  
> a.collect.foreach(println)
> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 
> 161)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>  
> I suspect the cause is substring operation  say x.getString(2).substring(1) 
> on empty values that according to web will throw this type of error
>  
>  
> The easiest solution seems to be to check whether x above is not null and do 
> the substring operation. Can this be done without using a UDF?
>  
> Thanks
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.



Re: Checking for null values when mapping

2016-02-20 Thread Ted Yu
For #2, you can filter out row whose first column has length 0. 

Cheers

> On Feb 20, 2016, at 6:59 AM, Mich Talebzadeh  wrote:
> 
> Thanks
>  
>  
> So what I did was
>  
> scala> val df = 
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
> string, Net: string, VAT: string, Total: string]
>  
>  
> scala> df.printSchema
> root
> |-- Invoice Number: string (nullable = true)
> |-- Payment date: string (nullable = true)
> |-- Net: string (nullable = true)
> |-- VAT: string (nullable = true)
> |-- Total: string (nullable = true)
>  
>  
> So all the columns are Strings
>  
> Then I tried to exclude null rows by filtering on all columns not being null 
> and map the rest
>  
> scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
> date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
> col("Total").isNotNull).map(x => 
> (x.getString(1),x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
> a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
> MapPartitionsRDD[176] at map at :21
>  
> This still comes up with “String index out of range: “ error
>  
> 16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>  
> My questions are:
>  
> 1.Doing the map,  map(x => (x.getString(1)  -- Can I replace 
> x.getString(1) with the actual column name say “Invoice Number” and so forth 
> for other columns as well?
> 2.   Sounds like it crashes because of these columns below at the end
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]  \\ example good one
> [] \\ bad one, empty one
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> [] \\ bad one, empty one
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> 3.Also to clarify I want to drop those two empty line -> []  if I 
> can. Unfortunately  drop() call does not work
> a.drop()
> :24: error: value drop is not a member of 
> org.apache.spark.rdd.RDD[(String, Double, Double, Double)]
>   a.drop()
> ^
>  
> Thanka again,
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
>  
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Peridale Technology Ltd, its 
> subsidiaries or their employees, unless expressly so stated. It is the 
> responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Peridale Technology Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>  
>  
> From: Michał Zieliński [mailto:zielinski.mich...@gmail.com] 
> Sent: 20 February 2016 08:59
> To: Mich Talebzadeh 
> Cc: user @spark 
> Subject: Re: Checking for null values when mapping
>  
> You can use filter and isNotNull on Column before the map.
>  
> On 20 February 2016 at 08:24, Mich Talebzadeh  wrote:
>  
> I have a DF like below reading a csv file
>  
>  
> val df = 
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
> "true").option("header", "true").load("/data/stg/table2")
>  
> val a = df.map(x => (x.getString(0), x.getString(1), 
> x.getString(2).substring(1).replace(",", 
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
> x.getString(4).substring(1).replace(",", "").toDouble))
>  
>  
> For most rows I am reading from csv file the above mapping works fine. 
> However, at the bottom of csv there are couple of empty columns as below
>  
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
> []
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
> []
> [year 2014,,?113,500.00,?0.00,?113,500.00]
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>  
> H

RE: Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
Thanks

 

 

So what I did was

 

scala> val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

 

 

scala> df.printSchema

root

|-- Invoice Number: string (nullable = true)

|-- Payment date: string (nullable = true)

|-- Net: string (nullable = true)

|-- VAT: string (nullable = true)

|-- Total: string (nullable = true)

 

 

So all the columns are Strings

 

Then I tried to exclude null rows by filtering on all columns not being null 
and map the rest

 

scala> val a = df.filter(col("Invoice Number").isNotNull and col("Payment 
date").isNotNull and col("Net").isNotNull and col("VAT").isNotNull and 
col("Total").isNotNull).map(x => 
(x.getString(1),x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

a: org.apache.spark.rdd.RDD[(String, Double, Double, Double)] = 
MapPartitionsRDD[176] at map at :21

 

This still comes up with “String index out of range: “ error

 

16/02/20 11:50:51 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 18)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

My questions are:

 

1.Doing the map,  map(x => (x.getString(1)  -- Can I replace x.getString(1) 
with the actual column name say “Invoice Number” and so forth for other columns 
as well?

2.   Sounds like it crashes because of these columns below at the end

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]  \\ example good one

[] \\ bad one, empty one

[Net income,,?182,531.25,?14,606.25,?197,137.50] 

[] \\ bad one, empty one

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

3.Also to clarify I want to drop those two empty line -> []  if I can. 
Unfortunately  drop() call does not work

a.drop()

:24: error: value drop is not a member of 
org.apache.spark.rdd.RDD[(String, Double, Double, Double)]

  a.drop()

^

 

Thanka again,

 

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/> 

 

NOTE: The information in this email is proprietary and confidential. This 
message is for the designated recipient only, if you are not the intended 
recipient, you should destroy it immediately. Any information in this message 
shall not be understood as given or endorsed by Peridale Technology Ltd, its 
subsidiaries or their employees, unless expressly so stated. It is the 
responsibility of the recipient to ensure that this email is virus free, 
therefore neither Peridale Technology Ltd, its subsidiaries nor their employees 
accept any responsibility.

 

 

From: Michał Zieliński [mailto:zielinski.mich...@gmail.com] 
Sent: 20 February 2016 08:59
To: Mich Talebzadeh 
Cc: user @spark 
Subject: Re: Checking for null values when mapping

 

You can use filter and isNotNull on  
<https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.Column> 
Column before the map.

 

On 20 February 2016 at 08:24, Mich Talebzadeh mailto:m...@peridale.co.uk> > wrote:

 

I have a DF like below reading a csv file

 

 

val df = 
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1), 
x.getString(2).substring(1).replace(",", 
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble, 
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine. However, 
at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID 161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1) on 
empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do 
the substring operation. Can this be done without us

Re: Checking for null values when mapping

2016-02-20 Thread Michał Zieliński
You can use filter and isNotNull on Column

before the map.

On 20 February 2016 at 08:24, Mich Talebzadeh  wrote:

>
>
> I have a DF like below reading a csv file
>
>
>
>
>
> val df =
> HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
>
>
>
> val a = df.map(x => (x.getString(0), x.getString(1),
> *x.getString(2).substring(1)*.replace(",",
> "").toDouble,x.getString(3).substring(1).replace(",", "").toDouble,
> x.getString(4).substring(1).replace(",", "").toDouble))
>
>
>
>
>
> For most rows I am reading from csv file the above mapping works fine.
> However, at the bottom of csv there are couple of empty columns as below
>
>
>
> [421,02/10/2015,?1,187.50,?237.50,?1,425.00]
>
> []
>
> [Net income,,?182,531.25,?14,606.25,?197,137.50]
>
> []
>
> [year 2014,,?113,500.00,?0.00,?113,500.00]
>
> [Year 2015,,?69,031.25,?14,606.25,?83,637.50]
>
>
>
> However, I get
>
>
>
> a.collect.foreach(println)
>
> 16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0
> (TID 161)
>
> java.lang.StringIndexOutOfBoundsException: String index out of range: -1
>
>
>
> I suspect the cause is substring operation  say
> x.getString(2).substring(1) on empty values that according to web will
> throw this type of error
>
>
>
>
>
> The easiest solution seems to be to check whether x above is not null and
> do the substring operation. Can this be done without using a UDF?
>
>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this
> message shall not be understood as given or endorsed by Peridale Technology
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is
> the responsibility of the recipient to ensure that this email is virus
> free, therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>
>
>
>
>


Checking for null values when mapping

2016-02-20 Thread Mich Talebzadeh
 

I have a DF like below reading a csv file

 

 

val df =
HiveContext.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load("/data/stg/table2")

 

val a = df.map(x => (x.getString(0), x.getString(1),
x.getString(2).substring(1).replace(",",
"").toDouble,x.getString(3).substring(1).replace(",", "").toDouble,
x.getString(4).substring(1).replace(",", "").toDouble))

 

 

For most rows I am reading from csv file the above mapping works fine.
However, at the bottom of csv there are couple of empty columns as below

 

[421,02/10/2015,?1,187.50,?237.50,?1,425.00]

[]

[Net income,,?182,531.25,?14,606.25,?197,137.50]

[]

[year 2014,,?113,500.00,?0.00,?113,500.00]

[Year 2015,,?69,031.25,?14,606.25,?83,637.50]

 

However, I get 

 

a.collect.foreach(println)

16/02/20 08:31:53 ERROR Executor: Exception in task 0.0 in stage 123.0 (TID
161)

java.lang.StringIndexOutOfBoundsException: String index out of range: -1

 

I suspect the cause is substring operation  say x.getString(2).substring(1)
on empty values that according to web will throw this type of error

 

 

The easiest solution seems to be to check whether x above is not null and do
the substring operation. Can this be done without using a UDF?

 

Thanks

 

Dr Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 



Re: efficient checking the existence of an item in a rdd

2015-12-31 Thread Nick Peterson
The key to efficient lookups is having a partitioner in place.

If you don't have a partitioner in place, essentially the best you can do
is:
def contains[T](rdd: RDD[T], value: T): Boolean = ! (rdd.filter(x => x ==
value).isEmpty)

If you are going to do this sort of operation frequently, it might pay to
make it a bit easier. Rather than dealing with an RDD[T], deal with an RDD
of pairs; for instance, you could do pairRdd = rdd.map(x => (x, 1)) to get
an RDD[(T, Int)].

Now that these are (technically) key-value pairs, you can come up with a
partitioner and apply it; something like:

val numPartitions = pairRdd.partitions.length
val partitioner = new HashPartitioner(numPartitions)
val partitionedRdd = pairRdd.partitionBy(partitioner)

Now, you can use partitionedRdd.lookup(value), which will give you back a
sequence of all the values (in this case, 1's) associated with the key
"value".

You can use rdd.lookup on any RDD of key-value pairs.  However, if you look
at the source at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
you'll
see that it is very efficient in the case of having a good partitioner --
it only actually looks in the partition that must contain the given key.
This can make all the difference!

-- Nick

On Thu, Dec 31, 2015 at 8:26 AM domibd  wrote:

> thanks a lot.
>
> It is very interesting.
>
> Unfortunatly it does not solve my very simple problem :
> efficiently find whether a value is in a huge rdd.
>
> thanks again
>
> Dominique
>
> Le 31/12/2015 01:26, madaan.amanmadaan [via Apache Spark User List] a
> écrit :
>
> > Hi,
> >
> > Check out https://github.com/amplab/spark-indexedrdd, might be helpful.
> >
> > Aman
> >
> > On Wed, Dec 30, 2015 at 12:13 PM, domibd [via Apache Spark User List]
> > <[hidden email] > wrote:
> >
> > hello,
> >
> > how can i check the existence of an item in a very large rdd
> > in a prallelised way such that the process stop as soon as
> > the item is found (if it is found)?
> >
> > thanks a lot
> >
> > Dominique
> >
> >
> ----
> > If you reply to this email, your message will be added to the
> > discussion below:
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839.html
> >
> > To start a new topic under Apache Spark User List, email [hidden
> > email] 
> > To unsubscribe from Apache Spark User List, click here.
> > NAML
> > <
> http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
> >
> >
> >
> >
> >
> > --
> > -Aman
> >
> >
> > 
> > If you reply to this email, your message will be added to the discussion
> > below:
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839p25840.html
> >
> > To unsubscribe from efficient checking the existence of an item in a
> > rdd, click here
> > <
> > NAML
> > <
> http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
> >
> > !DSPAM:152,56847c15223168992667645!
>
> --
> View this message in context: Re: efficient checking the existence of an
> item in a rdd
> <http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839p25845.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Re: efficient checking the existence of an item in a rdd

2015-12-31 Thread domibd
thanks a lot.

It is very interesting.

Unfortunatly it does not solve my very simple problem :
efficiently find whether a value is in a huge rdd.

thanks again

Dominique

Le 31/12/2015 01:26, madaan.amanmadaan [via Apache Spark User List] a 
écrit :
> Hi,
>
> Check out https://github.com/amplab/spark-indexedrdd, might be helpful.
>
> Aman
>
> On Wed, Dec 30, 2015 at 12:13 PM, domibd [via Apache Spark User List]
> <[hidden email] > wrote:
>
> hello,
>
> how can i check the existence of an item in a very large rdd
> in a prallelised way such that the process stop as soon as
> the item is found (if it is found)?
>
> thanks a lot
>
> Dominique
>
> 
> If you reply to this email, your message will be added to the
> discussion below:
>     
> http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839.html
>
> To start a new topic under Apache Spark User List, email [hidden
> email] 
> To unsubscribe from Apache Spark User List, click here.
> NAML
> 
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
>
>
> --
> -Aman
>
>
> 
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839p25840.html
>
> To unsubscribe from efficient checking the existence of an item in a
> rdd, click here
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=25839&code=ZGJAbGlwbi51bml2LXBhcmlzMTMuZnJ8MjU4Mzl8LTg5MTQ4NTg5NQ==>.
> NAML
> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
> !DSPAM:152,56847c15223168992667645!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/efficient-checking-the-existence-of-an-item-in-a-rdd-tp25839p25845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: performance when checking if data frame is empty or not

2015-09-09 Thread Ted Yu
Have you tried:

df.rdd.isEmpty

Cheers

On Tue, Sep 8, 2015 at 1:22 PM, Axel Dahl  wrote:

> I have a join, that fails when one of the data frames is empty.
>
> To avoid this I am hoping to check if the dataframe is empty or not before
> the join.
>
> The question is what's the most performant way to do that?
>
> should I do df.count() or df.first() or something else?
>
> Thanks in advance,
>
> -Axel
>


performance when checking if data frame is empty or not

2015-09-08 Thread Axel Dahl
I have a join, that fails when one of the data frames is empty.

To avoid this I am hoping to check if the dataframe is empty or not before
the join.

The question is what's the most performant way to do that?

should I do df.count() or df.first() or something else?

Thanks in advance,

-Axel


Incorrect ACL checking for partitioned table in Spark SQL-1.4

2015-06-16 Thread Karthik Subramanian
54)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4915)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:826)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:612)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy17.getListing(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.getListing(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
... 116 more




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-ACL-checking-for-partitioned-table-in-Spark-SQL-1-4-tp23355.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Checking Data Integrity in Spark

2015-03-27 Thread Arush Kharbanda
Its not possible to configure Spark to do checks based on xmls. You would
need to write jobs to do the validations you need.

On Fri, Mar 27, 2015 at 5:13 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hello,
>
> I want to check if there is any way to check the data integrity of the
> data files. The use case is perform data integrity check on large files
> 100+ columns and reject records (write it another file) that does not meet
> criteria's (such as NOT NULL, date format, etc). Since there are lot of
> columns/integrity rules we should able to data integrity check through
> configurations (like xml, json, etc); Please share your thoughts..
>
>
> Thanks
>
> Sathish
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Checking Data Integrity in Spark

2015-03-27 Thread Sathish Kumaran Vairavelu
Hello,

I want to check if there is any way to check the data integrity of the data
files. The use case is perform data integrity check on large files 100+
columns and reject records (write it another file) that does not meet
criteria's (such as NOT NULL, date format, etc). Since there are lot of
columns/integrity rules we should able to data integrity check through
configurations (like xml, json, etc); Please share your thoughts..


Thanks

Sathish


Re: checking

2015-02-06 Thread Arush Kharbanda
Yes they are.

On Fri, Feb 6, 2015 at 5:06 PM, Mohit Durgapal 
wrote:

> Just wanted to know If my emails are reaching the user list.
>
>
> Regards
> Mohit
>



-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


checking

2015-02-06 Thread Mohit Durgapal
Just wanted to know If my emails are reaching the user list.


Regards
Mohit


Re: Checking spark cache percentage programatically. And how to clear cache.

2014-05-28 Thread Matei Zaharia
You can remove cached RDDs by calling unpersist() on them.

You can also use SparkContext.getRDDStorageInfo to get info on cache usage, 
though this is a developer API so it may change in future versions. We will add 
a standard API eventually but this is just very closely tied to framework 
internals.

Matei

On May 28, 2014, at 5:32 PM, Sung Hwan Chung  wrote:

> Hi,
> 
> Is there a programmatic way of checking whether RDD has been 100% cached or 
> not? I'd like to do this to have two different path ways.
> 
> Additionally, how do you clear cache (e.g. if you want to cache different 
> RDDs, and you'd like to clear an existing cached RDD).
> 
> Thanks!



Checking spark cache percentage programatically. And how to clear cache.

2014-05-28 Thread Sung Hwan Chung
Hi,

Is there a programmatic way of checking whether RDD has been 100% cached or
not? I'd like to do this to have two different path ways.

Additionally, how do you clear cache (e.g. if you want to cache different
RDDs, and you'd like to clear an existing cached RDD).

Thanks!