unsubscribe

2018-01-28 Thread 韩盼
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How and when the types of the result set are figured out in Spark?

2018-01-28 Thread kant kodali
Hi All,

I would like to know how and when the types of the result set are figured
out in Spark? for example say I have the following dataframe.

*inputdf*

col1  | col2 | col3
---
  1   |   2  | 5
  2   |   3  | 6

Now say I do something like below (Pseudo sql)

resultdf = select col2/2 from inputdf

result.writeStream().format("es").start()

the first document in ES will be {"col2": 1} and the second document will
be {"col2": 1.5} so I would think ES would throw type mismatch error here
if dynamic mapping is disabled on ES server.

My question really know is from spark perspective when will the types of
resultdf will be figured out ? is it before writing to ES(in general any
sink) or after writing the first document?

Thanks!


Spark Dataframe Writer _temporary directory

2018-01-28 Thread Richard Primera
In a situation where multiple workflows write different partitions of the
same table.

Example:

10 Different processes are writing parquet or orc files for different
partitions of the same table foo, at 
/staging/tables/foo/partition_field=1,/staging/tables/foo/partition_field=2,/staging/tables/foo/partition_field=3...

It appears to me that it is currently not possible to do this simultaneously
for the same directory in a consistently stable way, since whenever a
Dataframe writer starts, it stores temporary files at
/staging/tables/foo/_temporary directory, which all writers use, and they
all eliminate it when they end writing. This has the effect that whatever
Dataframe writer ends up first, ends up deleting the temporary files of all
other writers that haven't finished. 

I believe this can be bypassed by having them all write to a
/staging/tables/foo/_temporary_someHash directory instead.

Is there currently a way to achieve this without having to edit the source
code?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



mapGroupsWithState in Python

2018-01-28 Thread ayan guha
Hi

I want to write something in Structured streaming:

1. I have a dataset which has 3 columns: id, last_update_timestamp,
attribute
2. I am receiving the data through Kinesis

I want to deduplicate records based on last_updated. In batch, it looks
like:

spark.sql("select * from (Select *, row_number() OVER(Partition by id order
by last_updated desc) rank  from table1) tmp where rank =1")

But now I would like to do it in Structured Stream. I need to maintain the
state of id as per the highest last_updated, across the triggers, for a
certain period (24 hours).

Questions:

1. Should I use mapGroupsWithState or is there any other (SQL?) solution?
Can anyone help me to write it?
2. Is mapGroupsWithState supported in Python?

 Just to ensure we cover bases, I have already tried using dropDuplicates,
but it is keeping the 1st record encountered for an Id, not updating the
state:

unpackedDF = kinesisDF.selectExpr("cast (data as STRING) jsonData")
dataDF = unpackedDF.select(get_json_object(unpackedDF.jsonData, '$.header.id
').alias('id'),
  get_json_object(unpackedDF.jsonData,
'$.header.last_updated').cast('timestamp').alias('last_updated'),
  unpackedDF.jsonData)

dedupDF =
dataDF.dropDuplicates(subset=['id']).withWatermark('last_updated','24
hours')


So it is not working. Any help is appreciated.

-- 
Best Regards,
Ayan Guha


Re: write parquet with statistics min max with binary field

2018-01-28 Thread Stephen Joung
After setting `parquet.strings.signed-min-max.enabled` to `true` in
`ShowMetaCommand.java`, parquet-tools meta show min,max.


@@ -57,8 +57,9 @@ public class ShowMetaCommand extends ArgsOnlyCommand {

 String[] args = options.getArgs();
 String input = args[0];

 Configuration conf = new Configuration();
+conf.set("parquet.strings.signed-min-max.enabled", "true");
 Path inputPath = new Path(input);
 FileStatus inputFileStatus =
inputPath.getFileSystem(conf).getFileStatus(inputPath);
 List footers = ParquetFileReader.readFooters(conf,
inputFileStatus, false);


Result

row group 1: RC:3 TS:56 OFFSET:4


field1:   BINARY SNAPPY DO:0 FPO:4 SZ:56/56/1.00 VC:3
ENC:DELTA_BYTE_ARRAY -- ST:[min: a, max: c, num_nulls: 0]


For the reference, this was intended symptom by PARQUET-686 [1].


[1] https://www.mail-archive.com/commits@parquet.apache.org/msg00491.html

2018-01-24 10:31 GMT+09:00 Stephen Joung :

> How can I write parquet file with min/max statistic?
>
> 2018-01-24 10:30 GMT+09:00 Stephen Joung :
>
>> Hi, I am trying to use spark sql filter push down. and specially want to
>> use row group skipping with parquet file.
>>
>> And I guessed that I need parquet file with statistics min/max.
>>
>> 
>>
>> On spark master branch - I tried to write single column with "a", "b",
>> "c" to parquet file f1
>>
>>scala> List("a", "b", "c").toDF("field1").coalesce(1
>> ).write.parquet("f1")
>>
>> But saved file does not have statistics (min, max)
>>
>>$ ls f1/*.parquet
>>f1/part-0-445036f9-7a40-4333-8405-8451faa44319-c000.snappy.parquet
>>$ parquet-tool meta  f1/*.parquet
>>file:file:/Users/stephen/p/spark/f
>> 1/part-0-445036f9-7a40-4333-8405-8451faa44319- c000.snappy.parquet
>>creator: parquet-mr version 1.8.2 (build
>> c6522788629e590a53eb79874b95f6c3ff11f16c)
>>extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"field1","type":"string",
>> "nullable":true,"metadata":{}}]}
>>
>>file schema: spark_schema
>>---
>> -
>>field1:  OPTIONAL BINARY O:UTF8 R:0 D:1
>>
>>row group 1: RC:3 TS:48 OFFSET:4
>>---
>> -
>>field1:   BINARY SNAPPY DO:0 FPO:4 SZ:50/48/0.96 VC:3
>> ENC:BIT_PACKED,RLE,PLAIN ST:[no stats for this column]
>>
>> 
>>
>> Any pointer or comment would be appreciated.
>> Thank you.
>>
>>
>


Re: S3 token times out during data frame "write.csv"

2018-01-28 Thread Jörn Franke
He is using CSV and either ORC or parquet would be fine.

> On 28. Jan 2018, at 06:49, Gourav Sengupta  wrote:
> 
> Hi,
> 
> There is definitely a parameter while creating temporary security credential 
> to mention the number of minutes those credentials will be active. There is 
> an upper limit ofcourse which is around 3 days in case I remember correctly 
> and the default, as you can see, is 30 mins.
> 
> Can you let me know:
> 1. how you are generating the credentials? (the exact code)
> 2. doing S3 writes from local network is super suboptimal anyway given the 
> network latency and cost associated with it. So why are you doing it?
> 3. when you are porting your code to EMR, do you still use accesskeys or do 
> you have to change your code?
> 4. Any particular reason why your partition value has "-" in it, therefore I 
> am trying to understand why is the partition value 2018-01-23 instead of 
> 20180123? Are you considering the partition type to be String?
> 5. Have you heard of and tried using spot instances, the cost is so 
> ridiculously low at that point of time, that there is no need to be running 
> the code locally (I am expecting that since you can run the code locally, 
> therefore the EMR instance size and node type would be small)
> 6. Why are you not using Parquet format and using ORC instead? I think that 
> many more products use Parquet and only HIVE uses ORC format.
> 
> Regards,
> Gourav Sengupta
> 
>> On Tue, Jan 23, 2018 at 10:58 PM, Vasyl Harasymiv 
>>  wrote:
>> Hi Spark Community,
>> 
>> Saving a data frame into a file on S3 using:
>> 
>> df.write.csv(s3_location)
>> 
>> If run for longer than 30 mins, the following error persists:
>> 
>> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error 
>> Code: ExpiredToken;`)
>> 
>> Potentially, because there is a hardcoded session limit in temporary S3 
>> connection from Spark.
>> 
>> One can specify the duration as per here:
>> 
>> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
>> 
>> One can, of course, chunk data into sub-30 min writes. However, Is there a 
>> way to change the token expiry parameter directly in Spark before using 
>> "write.csv"?
>> 
>> Thanks a lot for any help!
>> Vasyl
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Jan 23, 2018 at 2:46 PM, Toy  wrote:
>>> Thanks, I get this error when I switched to s3a://
>>> 
>>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: 
>>> com.amazonaws.services.s3.transfer.TransferManager.(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>>> at 
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>>> at 
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>>> 
 On Tue, 23 Jan 2018 at 15:05 Patrick Alwell  
 wrote:
 Spark cannot read locally from S3 without an S3a protocol; you’ll more 
 than likely need a local copy of the data or you’ll need to utilize the 
 proper jars to enable S3 communication from the edge to the datacenter.
 
  
 
 https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
 
  
 
 Here are the jars: 
 https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
 
  
 
 Looks like you already have them, in which case you’ll have to make small 
 configuration changes, e.g. s3 à s3a
 
  
 
 Keep in mind: The Amazon JARs have proven very brittle: the version of the 
 Amazon libraries must match the versions against which the Hadoop binaries 
 were built.
 
  
 
 https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
 
  
 
  
 
  
 
  
 
 From: Toy 
 Date: Tuesday, January 23, 2018 at 11:33 AM
 To: "user@spark.apache.org" 
 Subject: I can't save DataFrame from running Spark locally
 
  
 
 Hi,
 
  
 
 First of all, my Spark application runs fine in AWS EMR. However, I'm 
 trying to run it locally to debug some issue. My application is just to 
 parse log files and convert to DataFrame then convert to ORC and save to 
 S3. However, when I run  locally I get this error
 
  
 
 java.io.IOException: /orc/dt=2018-01-23 doesn't exist
 
 at 
 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
 
 at 
 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
 
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 
 at 
 

Re: Vectorized ORC Reader in Apache Spark 2.3 with Apache ORC 1.4.1.

2018-01-28 Thread Nicolas Paris
Hi

Thanks for this work.

Will this affect both:
1) spark.read.format("orc").load("...")
2) spark.sql("select ... from my_orc_table_in_hive")

?


Le 10 janv. 2018 à 20:14, Dongjoon Hyun écrivait :
> Hi, All.
> 
> Vectorized ORC Reader is now supported in Apache Spark 2.3.
> 
>     https://issues.apache.org/jira/browse/SPARK-16060
> 
> It has been a long journey. From now, Spark can read ORC files faster without
> feature penalty.
> 
> Thank you for all your support, especially Wenchen Fan.
> 
> It's done by two commits.
> 
>     [SPARK-16060][SQL] Support Vectorized ORC Reader
>     https://github.com/apache/spark/commit/f44ba910f58083458e1133502e193a
> 9d6f2bf766
> 
>     [SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc
> reader
>     https://github.com/apache/spark/commit/eaac60a1e20e29084b7151ffca964c
> faa5ba99d1
> 
> Please check OrcReadBenchmark for the final speed-up from `Hive built-in ORC`
> to `Native ORC Vectorized`.
> 
>     https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/
> apache/spark/sql/hive/orc/OrcReadBenchmark.scala
> 
> Thank you.
> 
> Bests,
> Dongjoon.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Vectorized ORC Reader in Apache Spark 2.3 with Apache ORC 1.4.1.

2018-01-28 Thread Dongjoon Hyun
Hi, Nicolas.

Yes. In Apache Spark 2.3, there are new sub-improvements for SPARK-20901
(Feature parity for ORC with Parquet).
For your questions, the following three are related.

1. spark.sql.orc.impl="native"
By default, `native` ORC implementation (based on the latest ORC 1.4.1)
is added.
The old one is `hive` implementation.

2. spark.sql.orc.enableVectorizedReader="true"
By default, `native` ORC implementation uses Vectorized Reader code
path if possible.
Please note that `Vectorization(Parquet/ORC) in Apache Spark` is only
supported only for simple data types.

3. spark.sql.hive.convertMetastoreOrc=true
Like Parquet, by default, Hive tables are converted into file-based
data sources to use Vectorization technique.

Bests,
Dongjoon.



On Sun, Jan 28, 2018 at 4:15 AM, Nicolas Paris  wrote:

> Hi
>
> Thanks for this work.
>
> Will this affect both:
> 1) spark.read.format("orc").load("...")
> 2) spark.sql("select ... from my_orc_table_in_hive")
>
> ?
>
>
> Le 10 janv. 2018 à 20:14, Dongjoon Hyun écrivait :
> > Hi, All.
> >
> > Vectorized ORC Reader is now supported in Apache Spark 2.3.
> >
> > https://issues.apache.org/jira/browse/SPARK-16060
> >
> > It has been a long journey. From now, Spark can read ORC files faster
> without
> > feature penalty.
> >
> > Thank you for all your support, especially Wenchen Fan.
> >
> > It's done by two commits.
> >
> > [SPARK-16060][SQL] Support Vectorized ORC Reader
> > https://github.com/apache/spark/commit/
> f44ba910f58083458e1133502e193a
> > 9d6f2bf766
> >
> > [SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized
> orc
> > reader
> > https://github.com/apache/spark/commit/
> eaac60a1e20e29084b7151ffca964c
> > faa5ba99d1
> >
> > Please check OrcReadBenchmark for the final speed-up from `Hive built-in
> ORC`
> > to `Native ORC Vectorized`.
> >
> > https://github.com/apache/spark/blob/master/sql/hive/
> src/test/scala/org/
> > apache/spark/sql/hive/orc/OrcReadBenchmark.scala
> >
> > Thank you.
> >
> > Bests,
> > Dongjoon.
>