RE: Equivalent of Redshift ListAgg function in Spark (Pyspak)

2017-10-08 Thread Mahesh Sawaiker
After doing group, you can use mkstring on the data frame. Following is an 
example where are columns are concatenated with space as a separator.



scala> call_cdf.map(row => row.mkString(" ")).show(false)
+-+
|value  



  |
+-+
|1 BAAA 1998-01-01 null null 2450997 NY Metro large 2325 1374075 
8AM-12AM Keith Cunningham 4 Matters may hear as; profita New, cold plants can 
put al Dante Cook 3 pri 4 ese 995 Park 3rd Dr. Suite 470 Five Points Ziebach 
County SD 56098 United States -6.0 0.02 
  |
|2 CAAA 1998-01-01 2000-12-31 null 2450876 Mid Atlantic large 4208 
837392 8AM-4PM Stephen Clem 3 Classes devote largely other, standard ter Free 
germans prove flatly industrial drugs. Low questions come to a equations. 
British, conservative Christopher Perez 6 cally 3 pri 245 Johnson  Circle Suite 
200 Fairview Williamson County TN 35709 United States -5.0 0.03|
|3 CAAA 2001-01-01 null null 2450876 Mid Atlantic small 3251 837392 
8AM-4PM William Johnson 3 Classes devote largely other, standard ter Ridiculous 
requirements must not implement about pure values. Substances know powers. 
Political rel Derrick Burke 6 cally 3 pri 245 Johnson  Circle Suite 200 
Fairview Williamson County TN 35709 United States -5.0 0.03|
|4 EAAA 1998-01-01 2000-01-01 null 2450872 North Midwest large 2596 
708708 8AM-4PM Lamont Greene 3 Events must find anyway  Great rates must ensure 
famous, other banks. As main goals get home as a  Marvin Dean 2 able 2 able 927 
Oak Main ST Suite 150 Five Points Williamson County TN 36098 United States -5.0 
0.03  |
|5 EAAA 2000-01-02 2001-12-31 null 2450872 North Midwest medium 
2596 708708 8AM-12AM Lamont Greene 3 Events must find anyway  So fresh supplies 
keep meanwhile religious, labour years. Rapid, careful subject Matthew Williams 
2 able 1 able 927 Oak Main ST Suite 150 Five Points Williamson County TN 36098 
United States -5.0 0.0 |
|6 EAAA 2002-01-01 null null 2450872 North Midwest small 2596 
708708 8AM-4PM Emilio Romano 6 As well novel sentences check through the plans. 
Sophisticated cities fall for e William Johnson 5 anti 1 able 927 Oak Main ST 
Suite 150 Five Points Williamson County TN 36098 United States -5.0 0.07
  |
+-+


+-+-+-+---+-+---+-++++++-+++-+---++--+---++--+--+---+---+-++--+-+-+-+
|cc_call_center_sk|cc_call_center_id|cc_rec_start_date|cc_rec_end_date|cc_closed_date_sk|cc_open_date_sk|
  cc_name|cc_class|cc_employees|cc_sq_ft|cc_hours|  
cc_manager|cc_mkt_id|cc_mkt_class| 
cc_mkt_desc|cc_market_manager|cc_division|cc_division_name|cc_company|cc_company_name|cc_street_number|cc_street_name|cc_street_type|cc_suite_number|
cc_city|cc_county|cc_state|cc_zip|   
cc_country|cc_gmt_offset|cc_tax_percentage|
+-+-+-+---+--

[Spark Structured Streaming] How to select events by latest timestamp and aggregate count

2017-10-08 Thread Li Zuwei
I would like to perform structured streaming aggregation with a windowing 
period. Given this following data schema. The objective is to filter by the 
latest occurring event based on user. Then aggregate the count of each event 
type for each location.

timelocation   user   type
 1A 1  one
 2A 1  two
 1B 2  one
 2B 2  one
 1A 3  two
 1A 4  one
Sample output:

location   countOne   countTwo
A  1 2
B  1 0
something like the following:

val aggTypes = df
  .select($"location", $"time", $"user", $"type")
  .groupBy($"user")
  .agg(max($"timestamp") as 'timestamp)
  .select("*")
  .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " 
seconds", DataConstant.t1min.toString + " seconds", $"location")
  .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" 
=== "two", $"type" as 'countTwo)))
  .drop($"window")
As structured streaming does not support multiple aggregations and 
Non-time-based windows are not supported on streaming DataFrames/Datasets. I am 
not sure if it is possible to achieve the desired output in 1 streaming query.

Any help is appreciated.

EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-08 Thread Tushar Sudake
Hello everyone,

I'm using 'r4.8xlarge' instances on EMR for my Spark Application.
To each node, I'm attaching one 512 GB EBS volume.

By logging in into nodes I tried verifying that this volume is being set
for 'spark.local.dir' by EMR automatically, but couldn't find any such
configuration.

Can someone please confirm this? Do I need to do it myself though bootstrap
steps?

Thanks,
Tushar


Re: Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Michael Armbrust
spark-avro  would be a good
example to start with.

On Sun, Oct 8, 2017 at 3:00 AM, Serega Sheypak 
wrote:

> Hi, did anyone try to implement Spark SQL dataset reader from SEQ file
> with protobuf inside to Dataset?
>
> Imagine I have protobuf def
> Person
>  - name: String
>  - lastName: String
> - phones: List[String]
>
> and generated scala case class:
> case class Person(name:String, lastName: String, phones: List[String])
>
> I want to write some component that gives me Dataset with types schema.
>
> val personsDataset = spark.read
>   .option("inferSchema", "true")[Person]
>
> Where can I take a look at references?
>


Re: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?

2017-10-08 Thread kant kodali
I have the following so far

private StructType getSchema() {
return new StructType()
.add("name", StringType)
.add("address", StringType)
.add("docs", StringType);
}


ds.select(explode_outer(from_json(ds.col("value"),
ArrayType.apply(getSchema(.as("result")).selectExpr("result.*");

This didn't quite work for me so just to clarify I have Json array of
documents as my input string
and I am trying to keep the values of my name, address, docs columns as a
string as well except
my input array string is flattened out by explode function.
Any suggestions will be great
Thanks!


On Sat, Oct 7, 2017 at 10:00 AM, Jules Damji  wrote:

> You might find these blogs helpful to parse & extract data from complex
> structures:
>
> https://databricks.com/blog/2017/06/27/4-sql-high-order-
> lambda-functions-examine-complex-structured-data-databricks.html
>
> https://databricks.com/blog/2017/06/13/five-spark-sql-
> utility-functions-extract-explore-complex-data-types.html
>
> Cheers
> Jules
>
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Oct 7, 2017, at 12:30 AM, kant kodali  wrote:
>
> I have a Dataset ds which consists of json rows.
>
> *Sample Json Row (This is just an example of one row in the dataset)*
>
> [
> {"name": "foo", "address": {"state": "CA", "country": "USA"}, 
> "docs":[{"subject": "english", "year": 2016}]}
> {"name": "bar", "address": {"state": "OH", "country": "USA"}, 
> "docs":[{"subject": "math", "year": 2017}]}
>
> ]
>
> ds.printSchema()
>
> root
>  |-- value: string (nullable = true)
>
> Now I want to convert into the following dataset using Spark 2.2.0
>
> name  | address   |  docs
> --
> "foo" | {"state": "CA", "country": "USA"} | [{"subject": "english", "year": 
> 2016}]
> "bar" | {"state": "OH", "country": "USA"} | [{"subject": "math", "year": 
> 2017}]
>
> Preferably Java but Scala is also fine as long as there are functions
> available in Java API
>
>


Re: Quick one... AWS SDK version?

2017-10-08 Thread Jonathan Kelly
Tushar,

Yes, the hadoop-aws jar installed on an emr-5.8.0 cluster was built with
AWS Java SDK 1.11.160, if that’s what you mean.

~ Jonathan
On Sun, Oct 8, 2017 at 8:42 AM Tushar Sudake  wrote:

> Hi Jonathan,
>
> Does that mean Hadoop-AWS 2.7.3 too is built against AWS SDK 1.11.160 and
> not 1.7.4?
>
> Thanks.
>
>
> On Oct 7, 2017 3:50 PM, "Jean Georges Perrin"  wrote:
>
>
> Hey Marco,
>
> I am actually reading from S3 and I use 2.7.3, but I inherited the project
> and they use some AWS API from Amazon SDK, which version is like from
> yesterday :) so it’s confused and AMZ is changing its version like crazy so
> it’s a little difficult to follow. Right now I went back to 2.7.3 and SDK
> 1.7.4...
>
> jg
>
>
> On Oct 7, 2017, at 15:34, Marco Mistroni  wrote:
>
> Hi JG
>  out of curiosity what's ur usecase? are you writing to S3? you could use
> Spark to do that , e.g using hadoop package
> org.apache.hadoop:hadoop-aws:2.7.1 ..that will download the aws client
> which is in line with hadoop 2.7.1?
>
> hth
>  marco
>
> On Fri, Oct 6, 2017 at 10:58 PM, Jonathan Kelly 
> wrote:
>
>> Note: EMR builds Hadoop, Spark, et al, from source against specific
>> versions of certain packages like the AWS Java SDK, httpclient/core,
>> Jackson, etc., sometimes requiring some patches in these applications in
>> order to work with versions of these dependencies that differ from what the
>> applications may support upstream.
>>
>> For emr-5.8.0, we have built Hadoop and Spark (the Spark Kinesis
>> connector, that is, since that's the only part of Spark that actually
>> depends upon the AWS Java SDK directly) against AWS Java SDK 1.11.160
>> instead of the much older version that vanilla Hadoop 2.7.3 would otherwise
>> depend upon.
>>
>> ~ Jonathan
>>
>> On Wed, Oct 4, 2017 at 7:17 AM Steve Loughran 
>> wrote:
>>
>>> On 3 Oct 2017, at 21:37, JG Perrin  wrote:
>>>
>>> Sorry Steve – I may not have been very clear: thinking about
>>> aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled
>>> with Spark.
>>>
>>>
>>>
>>> I know, but if you are talking to s3 via the s3a client, you will need
>>> the SDK version to match the hadoop-aws JAR of the same version of Hadoop
>>> your JARs have. Similarly, if you were using spark-kinesis, it needs to be
>>> in sync there.
>>>
>>>
>>> *From:* Steve Loughran [mailto:ste...@hortonworks.com
>>> ]
>>> *Sent:* Tuesday, October 03, 2017 2:20 PM
>>> *To:* JG Perrin 
>>> *Cc:* user@spark.apache.org
>>> *Subject:* Re: Quick one... AWS SDK version?
>>>
>>>
>>>
>>> On 3 Oct 2017, at 02:28, JG Perrin  wrote:
>>>
>>> Hey Sparkians,
>>>
>>> What version of AWS Java SDK do you use with Spark 2.2? Do you stick
>>> with the Hadoop 2.7.3 libs?
>>>
>>>
>>> You generally to have to stick with the version which hadoop was built
>>> with I'm afraid...very brittle dependency.
>>>
>>>
>
>


Re: Quick one... AWS SDK version?

2017-10-08 Thread Tushar Sudake
Hi Jonathan,

Does that mean Hadoop-AWS 2.7.3 too is built against AWS SDK 1.11.160 and
not 1.7.4?

Thanks.


On Oct 7, 2017 3:50 PM, "Jean Georges Perrin"  wrote:


Hey Marco,

I am actually reading from S3 and I use 2.7.3, but I inherited the project
and they use some AWS API from Amazon SDK, which version is like from
yesterday :) so it’s confused and AMZ is changing its version like crazy so
it’s a little difficult to follow. Right now I went back to 2.7.3 and SDK
1.7.4...

jg


On Oct 7, 2017, at 15:34, Marco Mistroni  wrote:

Hi JG
 out of curiosity what's ur usecase? are you writing to S3? you could use
Spark to do that , e.g using hadoop package  org.apache.hadoop:hadoop-aws:2.7.1
..that will download the aws client which is in line with hadoop 2.7.1?

hth
 marco

On Fri, Oct 6, 2017 at 10:58 PM, Jonathan Kelly 
wrote:

> Note: EMR builds Hadoop, Spark, et al, from source against specific
> versions of certain packages like the AWS Java SDK, httpclient/core,
> Jackson, etc., sometimes requiring some patches in these applications in
> order to work with versions of these dependencies that differ from what the
> applications may support upstream.
>
> For emr-5.8.0, we have built Hadoop and Spark (the Spark Kinesis
> connector, that is, since that's the only part of Spark that actually
> depends upon the AWS Java SDK directly) against AWS Java SDK 1.11.160
> instead of the much older version that vanilla Hadoop 2.7.3 would otherwise
> depend upon.
>
> ~ Jonathan
>
> On Wed, Oct 4, 2017 at 7:17 AM Steve Loughran 
> wrote:
>
>> On 3 Oct 2017, at 21:37, JG Perrin  wrote:
>>
>> Sorry Steve – I may not have been very clear: thinking about
>> aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled
>> with Spark.
>>
>>
>>
>> I know, but if you are talking to s3 via the s3a client, you will need
>> the SDK version to match the hadoop-aws JAR of the same version of Hadoop
>> your JARs have. Similarly, if you were using spark-kinesis, it needs to be
>> in sync there.
>>
>>
>> *From:* Steve Loughran [mailto:ste...@hortonworks.com
>> ]
>> *Sent:* Tuesday, October 03, 2017 2:20 PM
>> *To:* JG Perrin 
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Quick one... AWS SDK version?
>>
>>
>>
>> On 3 Oct 2017, at 02:28, JG Perrin  wrote:
>>
>> Hey Sparkians,
>>
>> What version of AWS Java SDK do you use with Spark 2.2? Do you stick with
>> the Hadoop 2.7.3 libs?
>>
>>
>> You generally to have to stick with the version which hadoop was built
>> with I'm afraid...very brittle dependency.
>>
>>


Equivalent of Redshift ListAgg function in Spark (Pyspak)

2017-10-08 Thread Somasundaram Sekar
Hi,



I want to concat multiple columns into a single column after grouping the
 DataFrame,



I want an functional equivalent of Redshift ListAgg function



pg_catalog.Listagg(column, '|')

 within GROUP( ORDER BY column) AS

name


LISTAGG Function

: For each group in a query, the LISTAGG aggregate function orders the rows
for that group according to the ORDER BY expression, then concatenates the
values into a single string.


Implement Dataset reader from SEQ file with protobuf to Dataset

2017-10-08 Thread Serega Sheypak
Hi, did anyone try to implement Spark SQL dataset reader from SEQ file with
protobuf inside to Dataset?

Imagine I have protobuf def
Person
 - name: String
 - lastName: String
- phones: List[String]

and generated scala case class:
case class Person(name:String, lastName: String, phones: List[String])

I want to write some component that gives me Dataset with types schema.

val personsDataset = spark.read
  .option("inferSchema", "true")[Person]

Where can I take a look at references?