Re: Executors idle, driver heap exploding and maxing only 1 cpu core

2019-05-23 Thread Nicholas Hakobian
One potential case that can cause this is the optimizer being a little
overzealous with determining if a table can be broadcasted or not. Have you
checked the UI or query plan to see if any steps include a
BroadcastHashJoin? Its possible that the optimizer thinks that it should be
able to fit the table in memory from looking at its size on disk, but it
actually cannot fit in memory. In this case you might want to look at
tuning the autoBroadcastJoinThreshold.

Another potential case is that at the step it looks like the driver is
"hanging" its attempting to load in a data source that is backed by a very
large number of files. Spark maintains a cache of file paths for a data
source to determine task splits, and we've seen the driver appear to hang
and/or crash if you try to load in thousands (or more) of tiny files per
partition, and you have a large number of partitions.

Hope this helps.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, May 23, 2019 at 7:36 AM Ashic Mahtab  wrote:

> Hi,
> We have a quite long winded Spark application we inherited with many
> stages. When we run on our spark cluster, things start off well enough.
> Workers are busy, lots of progress made, etc. etc. However, 30 minutes into
> processing, we see CPU usage of the workers drop drastically. At this time,
> we also see that the driver is maxing out exactly one core (though we've
> given it more than one), and its ram usage is creeping up. At this time,
> there's no logs coming out on the driver. Everything seems to stop, and
> then it suddenly starts working, and the workers start working again. The
> driver ram doesn't go down, but flatlines. A few minutes later, the same
> thing happens again - the world seems to stop. However, the driver soon
> crashes with an out of memory exception.
>
> What could be causing this sort of behaviour on the driver? We don't have
> any collect() or similar functions in the code. We're reading in from Azure
> blobs, processing, and writing back to Azure blobs. Where should we start
> in trying to get to the bottom of this? We're running Spark 2.4.1 in a
> stand-alone cluster.
>
> Thanks,
> Ashic.
>


Re: High level explanation of dropDuplicates

2019-05-20 Thread Nicholas Hakobian
>From doing some searching around in the spark codebase, I found the
following:

https://github.com/apache/spark/blob/163a6e298213f216f74f4764e241ee6298ea30b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1452-L1474

So it appears there is no direct operation called dropDuplicates or
Deduplicate, but there is an optimizer rule that converts this logical
operation to a physical operation that is equivalent to grouping by all the
columns you want to deduplicate across (or all columns if you are doing
something like distinct), and taking the First() value. So (using a pySpark
code example):

df = input_df.dropDuplicates(['col1', 'col2'])

Is effectively shorthand for saying something like:

df = input_df.groupBy('col1',
'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')

Except I assume that it has some internal optimization so it doesn't need
to pack/unpack the column data, and just returns the whole Row.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Mon, May 20, 2019 at 11:38 AM Yeikel  wrote:

> Hi ,
>
> I am looking for a high level explanation(overview) on how
> dropDuplicates[1]
> works.
>
> [1]
>
> https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326
>
> Could someone please explain?
>
> Thank you
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Pyspark 2.3] Logical operators (and/or) in pyspark

2019-05-13 Thread Nicholas Hakobian
I threw together a quick example that replicates what you see, then looked
at the physical plan:

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row

df = spark.createDataFrame([Row(list_names=['a', 'b', 'c', 'd'],
name=None), Row(list_names=['a', 'b', 'c', 'd'], name='a')])
df2 = df.withColumn('match_flag', col('list_names').isNull() |
contains(col('name'), col('list_names')))

Running df2.show() returns the error you mentioned. However, if you look at
the query plan you see the following:

== Physical Plan ==
*(1) Project [list_names#27, name#28, (isnull(list_names#27) ||
pythonUDF0#47) AS match_flag#32]
+- BatchEvalPython [(name#28, list_names#27)], [list_names#27,
name#28, pythonUDF0#47]
   +- Scan ExistingRDD[list_names#27,name#28]

Spark needs to evaluate the Python UDF in the case that it might be needed.
My guess is that the architecture of the PythonUDF pipeline requires the
values to be processed together in a batch. It appears that the result is
stored into a column reference that is then used the WholeStageCodegen
phase that follows the UDF evaluation:

[image: Screen Shot 2019-05-13 at 4.31.17 PM.png]
If you look at the code that is generated by the codegen, it seems like the
or condition might be optimized into a nested if..then..else statement but
I'm not experienced in digging into codegen output.

Hope this helps!

-Nick

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health




On Mon, May 13, 2019 at 8:38 AM Rishi Shah  wrote:

> Hi All,
>
> I am using or operator "|" in withColumn clause on a DataFrame in pyspark.
> However it looks like it always evaluates all the conditions regardless of
> first condition being true. Please find a sample below:
>
> contains = udf(lambda s, arr : s in arr, BooleanType())
>
> df.withColumn('match_flag', (col('list_names').isNull()) |
> (contains(col('name'), col('list_names'
>
> Here where list_names is null, it starts to throw an error : NoneType is
> not iterable.
>
> Any idea?
>
> --
> Regards,
>
> Rishi Shah
>


Re: Does Pyspark Support Graphx?

2018-02-19 Thread Nicholas Hakobian
If you copy the Jar file and all of the dependencies to the machines, you
can manually add them to the classpath. If you are using Yarn and HDFS you
can alternatively use --jars and point it to the hdfs locations of the jar
files and it will (in most cases) distribute them to the worker nodes at
job submission time.


Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sun, Feb 18, 2018 at 7:24 PM, xiaobo  wrote:

> Another question is how to install graphframes permanently when the spark
> nodes can not connect to the internet.
>
>
>
> -- Original --
> *From:* Denny Lee 
> *Date:* Mon,Feb 19,2018 10:23 AM
> *To:* xiaobo 
> *Cc:* user@spark.apache.org 
> *Subject:* Re: Does Pyspark Support Graphx?
>
> Note the --packages option works for both PySpark and Spark (Scala).  For
> the SparkLauncher class, you should be able to include packages ala:
>
> spark.addSparkArg("--packages", "graphframes:0.5.0-spark2.0-s_2.11")
>
>
> On Sun, Feb 18, 2018 at 3:30 PM xiaobo  wrote:
>
>> Hi Denny,
>> The pyspark script uses the --packages option to load graphframe library,
>> what about the SparkLauncher class?
>>
>>
>>
>> -- Original --
>> *From:* Denny Lee 
>> *Date:* Sun,Feb 18,2018 11:07 AM
>> *To:* 94035420 
>> *Cc:* user@spark.apache.org 
>> *Subject:* Re: Does Pyspark Support Graphx?
>> That’s correct - you can use GraphFrames though as it does support
>> PySpark.
>> On Sat, Feb 17, 2018 at 17:36 94035420  wrote:
>>
>>> I can not find anything for graphx module in the python API document,
>>> does it mean it is not supported yet?
>>>
>>


Re: Schema - DataTypes.NullType

2018-02-11 Thread Nicholas Hakobian
I spent a few minutes poking around in the source code and found this:

The data type representing None, used for the types that cannot be inferred.

https://github.com/apache/spark/blob/branch-2.1/python/pyspark/sql/types.py#L107-L113

Playing around a bit, this is the only use case that I could immediately
come up with; you have some type of a placeholder field already in data,
but its always null. If you let createDataFrame (and I bet other things
like DataFrameReader would behave similarly) try to infer it directly, it
will error out since it can't infer the schema automatically. Doing
something like below will allow the data to be used. And, if memory serves,
Hive has a concept of a Null data type also for these types of situations.

In [9]: df = spark.createDataFrame([Row(id=1, val=None), Row(id=2,
val=None)], schema=StructType([StructField('id', LongType()),
StructField('val', NullType())]))

In [10]: df.show()
+---++
| id| val|
+---++
|  1|null|
|  2|null|
+---++


In [11]: df.printSchema()
root
 |-- id: long (nullable = true)
 |-- val: null (nullable = true)


Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sun, Feb 11, 2018 at 5:40 AM, Jean Georges Perrin  wrote:

> What is the purpose of DataTypes.NullType, specially as you are building a
> schema? Have anyone used it or seen it as spart of a schema auto-generation?
>
>
> (If I keep asking long enough, I may get an answer, no? :) )
>
>
> > On Feb 4, 2018, at 13:15, Jean Georges Perrin  wrote:
> >
> > Any taker on this one? ;)
> >
> >> On Jan 29, 2018, at 16:05, Jean Georges Perrin  wrote:
> >>
> >> Hi Sparkians,
> >>
> >> Can someone tell me what is the purpose of DataTypes.NullType,
> specially as you are building a schema?
> >>
> >> Thanks
> >>
> >> jg
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Dataframe and HIVE

2018-02-09 Thread Nicholas Hakobian
Its possible that the format of your table is not compatible with your
version of hive, so Spark saved it in a way such that only Spark can read
it. When this happens it prints out a very visible warning letting you know
this has happened.

We've seen it most frequently when trying to save a parquet file with a
column in date format into a Hive table. In older versions of hive, its
parquet reader/writer did not support Date formats (among a couple others).

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Fri, Feb 9, 2018 at 9:59 AM, Prakash Joshi 
wrote:

> Ravi,
>
> Can you send the result of
> Show create table your_table_name
>
> Thanks
> Prakash
>
> On Feb 9, 2018 8:20 PM, "☼ R Nair (रविशंकर नायर)" <
> ravishankar.n...@gmail.com> wrote:
>
>> All,
>>
>> It has been three days continuously I am on this issue. Not getting any
>> clue.
>>
>> Environment: Spark 2.2.x, all configurations are correct. hive-site.xml
>> is in spark's conf.
>>
>> 1) Step 1: I created a data frame DF1 reading a csv file.
>>
>> 2) Did  manipulations on DF1. Resulting frame is passion_df.
>>
>> 3) passion_df.write.format("orc").saveAsTable("sampledb.passion")
>>
>> 4) The metastore shows the hive table., when I do "show tables" in HIVE,
>> I can see table name
>>
>> 5) I can't select in HIVE, though I can select from SPARK as
>> spark.sql("select * from sampledb.passion")
>>
>> Whats going on here? Please help. Why I am not seeing data from HIVE
>> prompt?
>> The "describe formatted " command on the table in HIVE shows he data is
>> is in default warehouse location ( /user/hive/warehouse) since I set it.
>>
>> I am not getting any definite answer anywhere. Many suggestions and
>> answers given in Stackoverflow et al.Nothing really works.
>>
>> So asking experts here for some light on this, thanks
>>
>> Best,
>> Ravion
>>
>>
>>


Re: Subqueries

2017-12-29 Thread Nicholas Hakobian
This sounds like a perfect example of using windowing functions. Have you
tried something like the following:

select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD,
CLSD_REAS_CD from (select *, max(instnc_id) *over ()* as max_inst_id FROM
Stat_hist) where instnc_id=max_inst_id

However, I have seen instances where window functions without partitioning
clauses will cause all partitions to be executed on one task (and spark
usually warns about this condition) and this will be very slow. It might
actually be more performant to use the inner join which, even though it is
scanning through the raw data twice, is more parallelizable.

If you have your data stored in a columnar compressed data format like
parquet or orc, the query on the right side of the join should only have a
single column, so I/O on that column would be significantly less than the
full table; you might even be able to squeeze some more performance out of
it (depending on the size of the table), by caching it beforehand.

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com

On Fri, Dec 29, 2017 at 1:02 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> I have a table, and I want to find the latest records in the table. The
> table has a column called instnc_id that is incremented everyday. So, I
> want to find the records that have the max instnc_id.
>
>
>
> I am trying to do this using subqueries, but it gives me an error. For
> example, when I try this
>
>
>
> select ACCT_ID, CR_RVKD_STAT_CD, ACCT_SFX_NUM, SCURT_FRD_STAT_CD,
> CLSD_REAS_CD from (select *, max(instnc_id) as max_inst_id FROM Stat_hist)
> where instnc_id=max_inst_id
>
>
>
> the error I get is
>
>
>
> Caused by: org.apache.spark.sql.AnalysisException: cannot resolve
> '`max_inst_id`' given input columns: [CR_RVKD_STAT_CD, ACCT_SFX_NUM,
> CLSD_REAS_CD, ACCT_ID, instnc_id, SCURT_FRD_STAT_CD]; line 1 pos 172;
>
> 'Project ['ACCT_ID, 'CR_RVKD_STAT_CD, 'ACCT_SFX_NUM, 'SCURT_FRD_STAT_CD,
> CLSD_REAS_CD, scalar-subquery#298 [] AS max_inst_id#299]
>
> :  +- 'Project [unresolvedalias('max('instnc_id), None)]
>
> : +- 'UnresolvedRelation `Stat_hist`
>
> +- 'Filter (instnc_id#92 = 'max_inst_id)
>
>+- SubqueryAlias stat_hist
>
>   +- Project [ACCT_ID#0, ACCT_SFX_NUM#1, CR_RVKD_STAT_CD#23,
> SCURT_FRD_STAT_CD#34, CLSD_REAS_CD#19, instnc_id#92]
>
>
>
> I have tried various combinations but I keep getting into the same
> problem: It doesn’t recognize max_inst_id as a column.
>
>
>
> The only thing that works is if I get max_inst_id in a dataframe and then
> inner join it with the original table
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: NLTK with Spark Streaming

2017-11-28 Thread Nicholas Hakobian
Depending on your needs, its fairly easy to write a lightweight python
wrapper around the Databricks spark-corenlp library:
https://github.com/databricks/spark-corenlp


Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sun, Nov 26, 2017 at 8:19 AM, ashish rawat  wrote:

> Thanks Holden and Chetan.
>
> Holden - Have you tried it out, do you know the right way to do it?
> Chetan - yes, if we use a Java NLP library, it should not be any issue in
> integrating with spark streaming, but as I pointed out earlier, we want to
> give flexibility to data scientists to use the language and library of
> their choice, instead of restricting them to a library of our choice.
>
> On Sun, Nov 26, 2017 at 9:42 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>> But you can still use Stanford NLP library and distribute through spark
>> right !
>>
>> On Sun, Nov 26, 2017 at 3:31 PM, Holden Karau 
>> wrote:
>>
>>> So it’s certainly doable (it’s not super easy mind you), but until the
>>> arrow udf release goes out it will be rather slow.
>>>
>>> On Sun, Nov 26, 2017 at 8:01 AM ashish rawat 
>>> wrote:
>>>
 Hi,

 Has someone tried running NLTK (python) with Spark Streaming (scala)? I
 was wondering if this is a good idea and what are the right Spark operators
 to do this? The reason we want to try this combination is that we don't
 want to run our transformations in python (pyspark), but after the
 transformations, we need to run some natural language processing operations
 and we don't want to restrict the functions data scientists' can use to
 Spark natural language library. So, Spark streaming with NLTK looks like
 the right option, from the perspective of fast data processing and data
 science flexibility.

 Regards,
 Ashish

>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>


Re: How to flatten a row in PySpark

2017-10-12 Thread Nicholas Hakobian
Using explode on the 4th column, followed by an explode on the 5th column
would produce what you want (you might need to use split on the columns
first if they are not already an array).

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, Oct 12, 2017 at 9:09 AM, Debabrata Ghosh 
wrote:

> Hi,
> Greetings !
>
> I am having data in the format of the following row:
>
> ABZ|ABZ|AF|2,3,7,8,B,C,D,E,J,K,L,M,P,Q,T,U,X,Y|1,2,3,4,5|730
>
> I want to convert it into several rows in the format below:
>
> ABZ|ABZ|AF|2|1|730
> ABZ|ABZ|AF|3+1|730
> .
> .
> .
> ABZ|ABZ|AF|3|1|730
> ABZ|ABZ|AF|3|2|730
> ABZ|ABZ|AF|3|3|730
> .
> .
> .
> ABZ|ABZ|AF|Y|4|730
> ABZ|ABZ|AF||Y|5|730
>
> Basically, I want to consider the various combinations of the 4th and 5th
> columns (where the values are delimited by commas) and accordingly generate
> the above rows from a single row. Please can you suggest me for a good way
> of acheiving this. Thanks in advance !
>
> Regards,
>
> Debu
>


Re: how do you deal with datetime in Spark?

2017-10-03 Thread Nicholas Hakobian
I'd suggest first converting your string containing your date/time to a
TimestampType or a DateType. Then the built in functions for year, month,
day, etc. will then work as expected. If your date is in a "standard"
format, you can perform the conversion just by casting the column to a date
or timestamp type. The list of types it can auto-convert are listed at this
link:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L270-L295

If casting won't work, you can manually convert it by specifying a format
string with the following builtin function:
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.unix_timestamp

The format string uses the java simpleDateFormat format string, if I
remember correctly (
http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html).

Nicholas Szandor Hakobian, Ph.D.
Staff Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Tue, Oct 3, 2017 at 10:43 AM, Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> I gave myself a project to start actually writing Spark programs. I’m
> using Scala and Spark 2.2.0. In my project, I had to do some grouping and
> filtering by dates. It was awful and took forever. I was trying to use
> dataframes and SQL as much as possible. I see that there are date functions
> in the dataframe API but trying to use them was frustrating. Even following
> code samples was a headache because apparently the code is different
> depending on which version of Spark you are using. I was really hoping for
> a rich set of date functions like you’d find in T-SQL but I never really
> found them.
>
>
>
> Is there a best practice for dealing with dates and time in Spark? I feel
> like taking a date/time string and converting it to a date/time object and
> then manipulating data based on the various components of the timestamp
> object (hour, day, year etc.) should be a heck of a lot easier than what
> I’m finding and perhaps I’m just not looking in the right place.
>
>
>
> You can see my work here: https://github.com/BobLovesData/Apache-Spark-In-
> 24-Hours/blob/master/src/net/massstreet/hour10/BayAreaBikeAnalysis.scala
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685 <(913)%20938-6685>
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData 
>
>
>
>
>


Re: Questions regarding Jobs, Stages and Caching

2017-05-25 Thread Nicholas Hakobian
If you do not specify a schema, then the json() function will attempt to
determine the schema, which requires a full scan of the file. Any
subsequent actions will again have to read in the data. See the
documentation at:

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

"If the schema parameter is not specified, this function goes through the
input once to determine the input schema."


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, May 25, 2017 at 9:24 AM, Steffen Schmitz 
wrote:

> Hi Ram,
>
> spark.read.json() should be evaluated on the first the call of .count().
> It should then be read into memory once and the rows are counted. After
> this operation it will be in memory and access will be faster.
> If you add println statements in between of your function calls you should
> see start Spark starts to work only after the call of count.
>
> Regards,
> Steffen
>
> On 25. May 2017, at 17:02, Ram Navan  wrote:
>
> Hi Steffen,
>
> Thanks for your response.
>
> Isn't spark.read.json() an action function? It reads the files from the
> source directory, infers the schema and creates a dataframe right?
> dataframe.cache() prints out this schema as well. I am not sure why
> dataframe.count() will try to do the same thing again (reading files from
> source). spark.read.json() and count() - both actions took 8 minutes each
> in my scenario. I'd expect only one of the action should incur the expenses
> of reading 19949 files from s3. Am I missing anything?
>
> Thank you!
>
> Ram
>
>
> On Thu, May 25, 2017 at 1:34 AM, Steffen Schmitz <
> steffenschm...@hotmail.de> wrote:
>
>> Hi Ram,
>>
>> Regarding your caching question:
>> The data frame is evaluated lazy. That means it isn’t cached directly on
>> invoking of .cache(), but on calling the first action on it (in your case
>> count).
>> Then it is loaded into memory and the rows are counted, not on the call
>> of .cache().
>> On the second call to count it is already in memory and cached and that’s
>> why it’s faster.
>>
>> I do not know if it’s allowed to recommend resources here, but I really
>> liked the Big Data Analysis with Spark Course by Heather Miller on Coursera.
>> And the Spark documentation is also a good place to start.
>>
>> Regards,
>> Steffen
>>
>> > On 25. May 2017, at 07:28, ramnavan  wrote:
>> >
>> > Hi,
>> >
>> > I’m new to Spark and trying to understand the inner workings of Spark
>> in the
>> > below mentioned scenarios. I’m using PySpark and Spark 2.1.1
>> >
>> > Spark.read.json():
>> >
>> > I am running executing this line
>> > “spark.read.json(‘s3a:///*.json’)” and a cluster with
>> three
>> > worker nodes (AWS M4.xlarge instances). The bucket has about 19949 json
>> > files and the total size is about 4.4 GB. The line created three spark
>> jobs
>> > first job with 1 tasks, second job with 19949 tasks and third job
>> with
>> > 1 tasks. Each of the jobs have one stage in it. Please refer to the
>> > attached images job0, job1 and job2.jpg.   job0.jpg
>> > > n28708/job0.jpg>
>> > job1.jpg
>> > > n28708/job1.jpg>
>> > job2.jpg
>> > > n28708/job2.jpg>
>> > I was expecting it to create 1 job with 19949 tasks.  I’d like to
>> understand
>> > why there are three jobs instead of just one and why reading json files
>> > calls for map operation.
>> >
>> > Caching and Count():
>> >
>> > Once spark reads 19949 json files into a dataframe (let’s call it
>> files_df),
>> > I am calling these two operations files_df.createOrReplaceTempVi
>> ew(“files)
>> > and files_df.cache(). I am expecting files_df.cache() will cache the
>> entire
>> > dataframe in memory so any subsequent operation will be faster. My next
>> > statement is files_df.count(). This operation took an entire 8.8
>> minutes and
>> > it looks like it read the files again from s3 and calculated the count.
>> > Please refer to attached count.jpg file for reference.   count.jpg
>> > > n28708/count.jpg>
>> > Why is this happening? If I call files_df.count() for the second time,
>> it
>> > comes back fast within few seconds. Can someone explain this?
>> >
>> > In general, I am looking for a good source to learn about Spark
>> Internals
>> > and try to understand what’s happening beneath the hood.
>> >
>> > Thanks in advance!
>> >
>> > Ram
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Questions-regarding-Jobs-Stages-and-
>> Caching-tp28708.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>

Re: java.lang.java.lang.UnsupportedOperationException

2017-04-19 Thread Nicholas Hakobian
CDH 5.5 only provides Spark 1.5. Are you managing your pySpark install
separately?

For something like your example, you will get significantly better
performance using coalesce with a lit, like so:

from pyspark.sql.functions import lit, coalesce

def replace_empty(icol):
return coalesce(col(icol), lit("")).alias(icol)

and use it similarly to what you are doing (I would build a function around
your if logic, its easier to understand):

def _if_not_in_processing(icol):
return icol if (icol not in colprocessing) else replace_empty(icol)

dfTotaleNormalize53 = dfTotaleNormalize52.select([_if_not_in_processing(i) for
i in dfTotaleNormalize52.columns])

Otherwise there isn't anything obvious to me as to why it isn't working. If
you actually do have pySpark 1.5 and not 1.6 I know it handles UDF
registration differently.

Hope this helps.


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Wed, Apr 19, 2017 at 5:13 AM, issues solution 
wrote:

> Pyspark 1.6  On cloudera 5.5  (yearn)
>
> 2017-04-19 13:42 GMT+02:00 issues solution :
>
>> Hi ,
>> somone can tell  me why i get the folowing error  with udf apply  like
>> udf
>>
>> def replaceCempty(x):
>> if x is None :
>> return ""
>> else :
>> return x.encode('utf-8')
>> udf_replaceCempty = F.udf(replaceCempty,StringType())
>>
>> dfTotaleNormalize53 = dfTotaleNormalize52.select([i if i not in
>> colprocessing  else  udf_replaceCempty(F.col(i)).alias(i) for i in
>> dfTotaleNormalize52.columns])
>>
>>
>> java.lang.java.lang.UnsupportedOperationException
>>
>>  Cannot evaluate expression: PythonUDF#replaceCempty(input[77,string])
>>
>> ??
>> regards
>>
>>
>>
>>
>


Re: Spark and Hive connection

2017-04-06 Thread Nicholas Hakobian
Spark connects directly to the Hive metastore service in order to manage
table definitions and locations and such. If you are using the CLI
interfaces and turn on INFO level logging, you can see when you instantiate
a HiveContext that it is connecting to the Hive Metastore and the URL its
using for the connection.

Hope this helps,
Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Wed, Apr 5, 2017 at 10:06 PM, infa elance  wrote:

> Hi all,
> When using spark-shell my understanding is spark connects to hive through
> metastore.
> The question i have is does spark connect to metastore , is it JDBC?
>
> Thanks and Regards,
> Ajay.
>


Re: Re: Re: how to change datatype by useing StructType

2017-01-12 Thread Nicholas Hakobian
Have you tried the native CSV reader (in spark 2) or the Databricks CSV
reader (in 1.6).

If your format is in a CSV like format it'll load it directly into a
DataFrame. Its possible you have some rows where types are inconsistent.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Thu, Jan 12, 2017 at 1:52 AM, lk_spark  wrote:

> I have try like this:
>
>   val peopleRDD = spark.sparkContext.textFile("/
> sourcedata/test/test*")
>   val rowRDD = peopleRDD.map(_.split(",")).map(attributes => {
>   val ab = ArrayBuffer[Any]()
>   for (i <- 0 until schemaType.length) {
> if (schemaType(i).equalsIgnoreCase("int")) {
>   ab += attributes(i).toInt
> } else if (schemaType(i).equalsIgnoreCase("long")) {
>   ab += attributes(i).toLong
> } else {
>   ab += attributes(i)
> }
>   }
>   Row(ab.toArray)
> })
>
> val peopleDF = spark.createDataFrame(rowRDD, schema)
> peopleDF .show
>
> I got error:
>  Caused by: java.lang.RuntimeException: [Ljava.lang.Object; is not a
> valid external type for schema of string
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply_0$(Unknown Source)
>   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> SpecificUnsafeProjection.apply(Unknown Source)
>   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.
> toRow(ExpressionEncoder.scala:290)
> all the file was Any, what should I do?
>
>
>
> 2017-01-12
> --
> lk_spark
> --
>
> *发件人:*"lk_spark"
> *发送时间:*2017-01-12 14:38
> *主题:*Re: Re: how to change datatype by useing StructType
> *收件人:*"ayan guha","user.spark"
> *抄送:*
>
> yes, field year is in my data:
>
> data:
>   kevin,30,2016
>   shen,30,2016
>   kai,33,2016
>   wei,30,2016
>
> this will not work
>val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
> Row(attributes(0),attributes(1),attributes(2)))
> but I need read data by configurable.
> 2017-01-12
> --
> lk_spark
> --
>
> *发件人:*ayan guha 
> *发送时间:*2017-01-12 14:34
> *主题:*Re: how to change datatype by useing StructType
> *收件人:*"lk_spark","user.spark"
> *抄送:*
>
> Do you have year in your data?
> On Thu, 12 Jan 2017 at 5:24 pm, lk_spark  wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> hi,all
>>
>>
>> I have a txt file ,and I want to process it as dataframe
>>
>> :
>>
>>
>>
>>
>>
>> data like this :
>>
>>
>>name1,30
>>
>>
>>name2,18
>>
>>
>>
>>
>>
>> val schemaString = "name age year"
>>
>>
>> val xMap=new
>>
>> scala.collection.mutable.HashMap[String,DataType]()
>>
>>
>> xMap.put("name", StringType)
>> xMap.put("age",
>>
>> IntegerType)
>> xMap.put("year",
>>
>> IntegerType)
>>
>> val fields =
>>
>> schemaString.split(" ").map(fieldName => StructField(fieldName,
>>
>> xMap.get(fieldName).get, nullable = true))
>> val schema =
>>
>> StructType(fields)
>>
>> val peopleRDD =
>>
>> spark.sparkContext.textFile("/sourcedata/test/test*")
>>
>>
>> //spark.read.schema(schema).text("/sourcedata/test/test*")
>>
>>
>>
>> val rowRDD = peopleRDD.map(_.split(",")).map(attributes
>>
>> => Row(attributes(0),attributes(1))
>>
>>
>>
>>
>>
>> // Apply the schema to the RDD
>> val
>>
>> peopleDF = spark.createDataFrame(rowRDD, schema)
>>
>>
>>
>>
>>
>> but when I write it to table or show it I will got
>>
>> error:
>>
>>
>>
>>
>>
>>
>>
>>
>>Caused by: java.lang.RuntimeException: Error while encoding:
>>
>> java.lang.RuntimeException: java.lang.String is not a valid external type
>> for
>>
>> schema of int
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top
>>
>> level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true) AS name#1
>> +- if (assertnotnull(input[0, org.apache.spark.sql.Row,
>>
>> true], top level row object).isNullAt) null else staticinvoke(class
>>
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>>
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>>
>> org.apache.spark.sql.Row, true], top level row object), 0, name),
>> StringType),
>>
>> true)
>>
>>
>>
>>
>>
>>if I change my code it will work:
>>
>>
>>val rowRDD = peopleRDD.map(_.split(",")).map(attributes =>
>>
>> Row(attributes(0),attributes(1).toInt)
>>
>>
>>but this is not a good idea .
>>
>>
>>
>>
>> 2017-01-12
>>
>>
>> --
>>
>>
>> lk_spark
>>
>


Re: What is the difference between hive on spark and spark on hive?

2017-01-09 Thread Nicholas Hakobian
Hive on Spark is Hive which takes sql statements in and creates Spark jobs
for processing instead of Mapreduce or Tez.

There is no such thing as "Spark on Hive", but there is SparkSQL. SparkSQL
can accept both programmatic statements or it can parse SQL statements to
produce a native Spark DataFrame. It does provide connectivity to the Hive
metastore, and in Spark 1.6 does call into Hive to provide functionality
that doesn't yet exist natively in Spark. I'm not sure how much of that
still exists in Spark 2.0, but I think much of it has been converted into
native Spark functions.

There is also the SparkSQL shell and thrift server which provides a SQL
only interface, but uses all the native Spark pipeline.

Hope this helps!
-Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com

On Mon, Jan 9, 2017 at 7:05 AM, 李斌松  wrote:

> What is the difference between hive on spark and spark on hive?
>


Re: Custom delimiter file load

2016-12-31 Thread Nicholas Hakobian
See the documentation for the options given to the csv function:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@csv(paths:String*):org.apache.spark.sql.DataFrame

The options can be passed with the option/options functions to the
DataFrameReader class (a similar syntax is also available in pySpark).

-Nick


Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Sat, Dec 31, 2016 at 9:58 AM, A Shaikh  wrote:

> In Pyspark 2 loading file wtih any delimiter into Dataframe is pretty
> straightforward
> spark.read.csv(file, schema=, sep='|')
>
> Is there something similar in Spark 2 in Scala! spark.read.csv(path,
> sep='|')?
>
>


Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Nicholas Hakobian
Yep, sequential joins is what I have done in the past with similar
requirements.

Splitting and merging DataFrames is most likely killing performance if you
do not cache the DataFrame pre-split. If you do, it will compute the
lineage prior to the cache statement once (at first invocation), then use
the cached result to perform the additional join, then union the results.
Without the cache, you are most likely computing the full lineage twice,
all the way back to the raw data import and having double the read I/O.

The most optimal path will most likely depend on the size of the tables you
are joining to. If both are small (compared to the primary data source) and
can be broadcasted, doing the sequential join will most likely be the
easiest and most efficient approach. If one (or both) of the tables you are
joining to are significantly large enough that they cannot be efficiently
broadcasted, going through the join / cache / split / second join / union
path is likely to be faster. It also depends on how much memory you can
dedicate to caching...the possibilities are endless.

I tend to approach this type of problem by weighing the cost of extra
development time for a more complex join vs the extra execution time vs
frequency of execution. For something that will execute daily (or more
frequently) the cost of more development to have faster execution time
(even if its only 2x faster) might be worth it.

It might also be worth investigating if a newer version of Spark (1.6 at
the least, or 2.0 if possible) is feasible to install. There are lots of
performance improvements in those versions, if you have the option of
upgrading.

-Nick

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com


On Fri, Dec 30, 2016 at 3:35 PM, Sesterhenn, Mike 
wrote:

> Thanks Nicholas.  It looks like for some of my use cases, I might be able
> to use do sequential joins, and then use coalesce() (or in combination with
> withColumn(when()...)) to sort out the results.
>
>
> Splitting and merging dataframes seems to really kills my app
> performance.  I'm not sure if it's a spark 1.5 thing or what, but I just
> refactored one column to do one less split/merge, and it saved me almost
> half the time on my job.  But for some use cases I don't seem to be able to
> avoid them.  It is important in some cases to NOT do a join under certain
> conditions for a row because bad data will result.
>
>
> Any other thoughts?
> --
> *From:* Nicholas Hakobian 
> *Sent:* Friday, December 30, 2016 2:12:40 PM
> *To:* Sesterhenn, Mike
> *Cc:* ayan guha; user@spark.apache.org
>
> *Subject:* Re: Best way to process lookup ETL with Dataframes
>
> It looks like Spark 1.5 has the coalesce function, which is like NVL, but
> a bit more flexible. From Ayan's example you should be able to use:
> coalesce(b.col, c.col, 'some default')
>
> If that doesn't have the flexibility you want, you can always use nested
> case or if statements, but its just harder to read.
>
> Nicholas Szandor Hakobian, Ph.D.
> Senior Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
>
> On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
> wrote:
>
>> Thanks, but is nvl() in Spark 1.5?  I can't find it in
>> spark.sql.functions (http://spark.apache.org/docs/
>> 1.5.0/api/scala/index.html#org.apache.spark.sql.functions$)
>>
>>
>> Reading about the Oracle nvl function, it seems it is similar to the na
>> functions.  Not sure it will help though, because what I need is to join
>> after the first join fails.
>>
>> --
>> *From:* ayan guha 
>> *Sent:* Thursday, December 29, 2016 11:06 PM
>> *To:* Sesterhenn, Mike
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Best way to process lookup ETL with Dataframes
>>
>> How about this -
>>
>> select a.*, nvl(b.col,nvl(c.col,'some default'))
>> from driving_table a
>> left outer join lookup1 b on a.id=b.id
>> left outer join lookup2 c on a.id=c,id
>>
>> ?
>>
>> On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
>> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I'm writing an ETL process with Spark 1.5, and I was wondering the best
>>> way to do something.
>>>
>>>
>>> A lot of the fields I am processing require an algorithm similar to this:
>>>
>>>
>>> Join input dataframe to a lookup table.
>>>
>>> if (that lookup fails (the joined fields are null)) {
>>>
>>> Lookup into some other table to join some other fields.
>>>
>>> }
>>>
>>>
&

Re: Best way to process lookup ETL with Dataframes

2016-12-30 Thread Nicholas Hakobian
It looks like Spark 1.5 has the coalesce function, which is like NVL, but a
bit more flexible. From Ayan's example you should be able to use:
coalesce(b.col, c.col, 'some default')

If that doesn't have the flexibility you want, you can always use nested
case or if statements, but its just harder to read.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Fri, Dec 30, 2016 at 7:46 AM, Sesterhenn, Mike 
wrote:

> Thanks, but is nvl() in Spark 1.5?  I can't find it in spark.sql.functions
> (http://spark.apache.org/docs/1.5.0/api/scala/index.html#
> org.apache.spark.sql.functions$)
>
>
> Reading about the Oracle nvl function, it seems it is similar to the na
> functions.  Not sure it will help though, because what I need is to join
> after the first join fails.
>
> --
> *From:* ayan guha 
> *Sent:* Thursday, December 29, 2016 11:06 PM
> *To:* Sesterhenn, Mike
> *Cc:* user@spark.apache.org
> *Subject:* Re: Best way to process lookup ETL with Dataframes
>
> How about this -
>
> select a.*, nvl(b.col,nvl(c.col,'some default'))
> from driving_table a
> left outer join lookup1 b on a.id=b.id
> left outer join lookup2 c on a.id=c,id
>
> ?
>
> On Fri, Dec 30, 2016 at 9:55 AM, Sesterhenn, Mike 
> wrote:
>
>> Hi all,
>>
>>
>> I'm writing an ETL process with Spark 1.5, and I was wondering the best
>> way to do something.
>>
>>
>> A lot of the fields I am processing require an algorithm similar to this:
>>
>>
>> Join input dataframe to a lookup table.
>>
>> if (that lookup fails (the joined fields are null)) {
>>
>> Lookup into some other table to join some other fields.
>>
>> }
>>
>>
>> With Dataframes, it seems the only way to do this is to do something like
>> this:
>>
>>
>> Join input dataframe to a lookup table.
>>
>> if (that lookup fails (the joined fields are null)) {
>>
>>*SPLIT the dataframe into two DFs via DataFrame.filter(),
>>
>>   one group with successful lookup, the other failed).*
>>
>>For failed lookup:  {
>>
>>Lookup into some other table to grab some other fields.
>>
>>}
>>
>>*MERGE the dataframe splits back together via DataFrame.unionAll().*
>> }
>>
>>
>> I'm seeing some really large execution plans as you might imagine in the
>> Spark Ui, and the processing time seems way out of proportion with the size
>> of the dataset.  (~250GB in 9 hours).
>>
>>
>> Is this the best approach to implement an algorithm like this?  Note also
>> that some fields I am implementing require multiple staged split/merge
>> steps due to cascading lookup joins.
>>
>>
>> Thanks,
>>
>>
>> *Michael Sesterhenn*
>>
>>
>> *msesterh...@cars.com  *
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-29 Thread Nicholas Hakobian
If you are using spark 2.0 (as listed in the stackoverflow post) why are
you using the external CSV module from Databricks? Spark 2.0 includes the
functionality from this external module natively, and its possible you are
mixing an older library with a newer spark which could explain a crash.

Nicholas Szandor Hakobian, Ph.D.
Senior Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com



On Thu, Dec 29, 2016 at 4:00 AM, Palash Gupta <
spline_pal...@yahoo.com.invalid> wrote:

> Hi Marco,
>
> Thanks for your response.
>
> Yes I tested it before & am able to load from linux filesystem and it also
> sometimes have similar issue.
>
> However in both cases (either from hadoop or linux file system), this
> error comes in some specific scenario as per my observations:
>
> 1. When two parallel spark separate application is initiated from one
> driver (not all the time, sometime)
> 2. If one spark jobs are running for more than expected hour let say 2-3
> hours, the second application terminated giving the error.
>
> To debug the problem for me it will be good if you can share some possible
> reasons why failed to broadcast error may come.
>
> Or if you need more logs I can share.
>
> Thanks again Spark User Group.
>
> Best Regards
> Palash Gupta
>
>
>
> Sent from Yahoo Mail on Android
> 
>
> On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni
>  wrote:
> Hi
>  Pls try to read a CSV from filesystem instead of hadoop. If you can read
> it successfully then your hadoop file is the issue and you can start
> debugging from there.
> Hth
>
> On 29 Dec 2016 6:26 am, "Palash Gupta" 
> wrote:
>
>> Hi Apache Spark User team,
>>
>>
>>
>> Greetings!
>>
>> I started developing an application using Apache Hadoop and Spark using
>> python. My pyspark application randomly terminated saying "Failed to get
>> broadcast_1*" and I have been searching for suggestion and support in
>> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
>> pyspark application
>> 
>>
>>
>> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
>> I was building an application on Apache Spark 2.00 with Python 3.4 and
>> trying to load some CSV files from HDFS (...
>>
>> 
>>
>>
>> Could you please provide suggestion registering myself in Apache User
>> list or how can I get suggestion or support to debug the problem I am
>> facing?
>>
>> Your response will be highly appreciated.
>>
>>
>>
>> Thanks & Best Regards,
>> Engr. Palash Gupta
>> WhatsApp/Viber: +8801817181502
>> Skype: palash2494
>>
>>
>>


Re: How to filter based on a constant value

2016-07-30 Thread Nicholas Hakobian
>From the online docs:
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/Row.html#apply(int)

java.lang.Object apply(int i)
Returns the value at position i. If the value is null, null is returned.
The following is a mapping between Spark SQL types and return types:

So its returning the content of the first element in the row, in this case
the Array (of length 1) of Date types.


Nicholas Szandor Hakobian
Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com
M: 510-295-7113


On Sat, Jul 30, 2016 at 11:41 PM, Mich Talebzadeh  wrote:

> thanks gents.
>
> I am trying to understand this better.
>
> As I understand a DataFrame is basically an equivalent table in relational
> term.
>
> so
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate"))
> maxdate: org.apache.spark.sql.DataFrame = [max(transactiondate): date]
>
> So I find the max(transactiondate) for the filter I have applied.  In sql
> term --> select max(transactiondate) from ll_18740868 where
> transactiondescription like "%HASHTAG%"
>
> Now I want to store it in a single variable and get it worked out
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect
> maxdate: Array[org.apache.spark.sql.Row] = Array([2015-12-15])
>
> Now I have the value stored in a row. I get it as follows. It is the
> first column of the row (actually the only column) and in date format
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.
> apply(0).getDate(0)
> maxdate: java.sql.Date = 2015-12-15
>
> what is the role of apply(0) here?
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 03:28, Xinh Huynh  wrote:
>
>> Hi Mitch,
>>
>> I think you were missing a step:
>> [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
>> Since maxdate is of type Row, you would want to extract the first column
>> of the Row with:
>>
>> >> val maxdateStr = maxdate.getString(0)
>>
>> assuming the column type is String.
>> API doc is here:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
>>
>> Then you can do the query:
>>
>> >> col("transactiondate") === maxdateStr
>>
>> Xinh
>>
>> On Sat, Jul 30, 2016 at 5:20 PM, ayan guha  wrote:
>>
>>> select *
>>> from (select *,
>>>  rank() over (order by transactiondate) r
>>>from ll_18740868 where transactiondescription='XYZ'
>>>   ) inner
>>> where r=1
>>>
>>> Hi Mitch,
>>>
>>> If using SQL is fine, you can try the code above. You need to register
>>> ll_18740868  as temp table.
>>>
>>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>

 Hi,

 I would like to find out when it was the last time I paid a company
 with Debit Card


 This is the way I do it.

 1) Find the date when I paid last
 2) Find the rest of details from the row(s)

 So

 var HASHTAG = "XYZ"
 scala> var maxdate =
 ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
 maxdate: org.apache.spark.sql.Row = [2015-12-15]

 OK so it was 2015-12-15


 Now I want to get the rest of the columns. This one works when I hard
 code the maxdate!


 scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
 && col("transactiondate") === "2015-12-15").select("transactiondate",
 "transactiondescription", "debitamount").show
 +---+--+---+
 |transactiondate|transactiondescription|debitamount|
 +---+--+---+
 | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
 +---+--+---+

 Now if I want to use the var maxdate in place of "2015-12-15", how
 would I do that?

 I tried lit(maxdate) etc but they are all giving me error?

 java.lang.RuntimeException: Unsupported literal type class
 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 [2015-12-15]


 Thanks

>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>