Re: Spark Utf 8 encoding

2018-11-09 Thread Sean Owen
That doesn't necessarily look like a Spark-related issue. Your
terminal seems to be displaying the glyph with a question mark because
the font lacks that symbol, maybe?
On Fri, Nov 9, 2018 at 7:17 PM lsn24  wrote:
>
> Hello,
>
>  Per the documentation default character encoding of spark is UTF-8. But
> when i try to read non ascii characters, spark tend to read it as question
> marks. What am I doing wrong ?. Below is my Syntax:
>
> val ds = spark.read.textFile("a .bz2 file from hdfs");
> ds.show();
>
> The string "KøBENHAVN"  gets displayed as "K�BENHAVN"
>
> I did the testing on spark shell, ran it the same command as a part of spark
> Job. Both yields the same result.
>
> I don't know what I am missing . I read the documentation, I couldn't find
> any explicit config etc.
>
> Any pointers will be greatly appreciated!
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark Utf 8 encoding

2018-11-09 Thread lsn24
Hello,

 Per the documentation default character encoding of spark is UTF-8. But
when i try to read non ascii characters, spark tend to read it as question
marks. What am I doing wrong ?. Below is my Syntax:

val ds = spark.read.textFile("a .bz2 file from hdfs");
ds.show();

The string "KøBENHAVN"  gets displayed as "K�BENHAVN"

I did the testing on spark shell, ran it the same command as a part of spark
Job. Both yields the same result.

I don't know what I am missing . I read the documentation, I couldn't find
any explicit config etc.

Any pointers will be greatly appreciated!

Thanks




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: DataSourceV2 capability API

2018-11-09 Thread Reynold Xin
"If there is no way to report a feature (e.g., able to read missing as
null) then there is no way for Spark to take advantage of it in the first
place"

Consider this (just a hypothetical scenario): We added "supports-decimal"
in the future, because we see a lot of data sources don't support decimal
and we want a more graceful error handling. That'd break all existing data
sources.

You can say we would never add any "existing" features to the feature list
in the future, as a requirement for the feature list. But then I'm
wondering how much does it really give you, beyond telling data sources to
throw exceptions when they don't support a specific operation.


On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue  wrote:

> Do you have an example in mind where we might add a capability and break
> old versions of data sources?
>
> These are really for being able to tell what features a data source has.
> If there is no way to report a feature (e.g., able to read missing as null)
> then there is no way for Spark to take advantage of it in the first place.
> For the uses I've proposed, forward compatibility isn't a concern. When we
> add a capability, we add handling for it that old versions wouldn't be able
> to use anyway. The advantage is that we don't have to treat all sources the
> same.
>
> On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:
>
>> How do we deal with forward compatibility? Consider, Spark adds a new
>> "property". In the past the data source supports that property, but since
>> it was not explicitly defined, in the new version of Spark that data source
>> would be considered not supporting that property, and thus throwing an
>> exception.
>>
>>
>> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>>
>>> I'd have two places. First, a class that defines properties supported
>>> and identified by Spark, like the SQLConf definitions. Second, in
>>> documentation for the v2 table API.
>>>
>>> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
>>> wrote:
>>>
 One question is where will the list of capability strings be defined?


 --
 *From:* Ryan Blue 
 *Sent:* Thursday, November 8, 2018 2:09 PM
 *To:* Reynold Xin
 *Cc:* Spark Dev List
 *Subject:* Re: DataSourceV2 capability API


 Yes, we currently use traits that have methods. Something like
 “supports reading missing columns” doesn’t need to deliver methods. The
 other example is where we don’t have an object to test for a trait (
 scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
 done. That could be expensive so we can use a capability to fail faster.

 On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:

> This is currently accomplished by having traits that data sources can
> extend, as well as runtime exceptions right? It's hard to argue one way vs
> another without knowing how things will evolve (e.g. how many different
> capabilities there will be).
>
>
> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
> wrote:
>
>> Hi everyone,
>>
>> I’d like to propose an addition to DataSourceV2 tables, a capability
>> API. This API would allow Spark to query a table to determine whether it
>> supports a capability or not:
>>
>> val table = catalog.load(identifier)
>> val supportsContinuous = table.isSupported("continuous-streaming")
>>
>> There are a couple of use cases for this. First, we want to be able
>> to fail fast when a user tries to stream a table that doesn’t support it.
>> The design of our read implementation doesn’t necessarily support this. 
>> If
>> we want to share the same “scan” across streaming and batch, then we need
>> to “branch” in the API after that point, but that is at odds with failing
>> fast. We could use capabilities to fail fast and not worry about that
>> concern in the read design.
>>
>> I also want to use capabilities to change the behavior of some
>> validation rules. The rule that validates appends, for example, doesn’t
>> allow a write that is missing an optional column. That’s because the
>> current v1 sources don’t support reading when columns are missing. But
>> Iceberg does support reading a missing column as nulls, so that users can
>> add a column to a table without breaking a scheduled job that populates 
>> the
>> table. To fix this problem, I would use a table capability, like
>> read-missing-columns-as-null.
>>
>> Any comments on this approach?
>>
>> rb
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

 --
 Ryan Blue
 Software Engineer
 Netflix

>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
For that case, I think we would have a property that defines whether
supports-decimal is assumed or checked with the capability.

Wouldn't we have this problem no matter what the capability API is? If we
used a trait to signal decimal support, then we would have to deal with
sources that were written before the trait was introduced. That doesn't
change the need for some way to signal support for specific capabilities
like the ones I've suggested.

On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin  wrote:

> "If there is no way to report a feature (e.g., able to read missing as
> null) then there is no way for Spark to take advantage of it in the first
> place"
>
> Consider this (just a hypothetical scenario): We added "supports-decimal"
> in the future, because we see a lot of data sources don't support decimal
> and we want a more graceful error handling. That'd break all existing data
> sources.
>
> You can say we would never add any "existing" features to the feature list
> in the future, as a requirement for the feature list. But then I'm
> wondering how much does it really give you, beyond telling data sources to
> throw exceptions when they don't support a specific operation.
>
>
> On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue  wrote:
>
>> Do you have an example in mind where we might add a capability and break
>> old versions of data sources?
>>
>> These are really for being able to tell what features a data source has.
>> If there is no way to report a feature (e.g., able to read missing as null)
>> then there is no way for Spark to take advantage of it in the first place.
>> For the uses I've proposed, forward compatibility isn't a concern. When we
>> add a capability, we add handling for it that old versions wouldn't be able
>> to use anyway. The advantage is that we don't have to treat all sources the
>> same.
>>
>> On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:
>>
>>> How do we deal with forward compatibility? Consider, Spark adds a new
>>> "property". In the past the data source supports that property, but since
>>> it was not explicitly defined, in the new version of Spark that data source
>>> would be considered not supporting that property, and thus throwing an
>>> exception.
>>>
>>>
>>> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>>>
 I'd have two places. First, a class that defines properties supported
 and identified by Spark, like the SQLConf definitions. Second, in
 documentation for the v2 table API.

 On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
 wrote:

> One question is where will the list of capability strings be defined?
>
>
> --
> *From:* Ryan Blue 
> *Sent:* Thursday, November 8, 2018 2:09 PM
> *To:* Reynold Xin
> *Cc:* Spark Dev List
> *Subject:* Re: DataSourceV2 capability API
>
>
> Yes, we currently use traits that have methods. Something like
> “supports reading missing columns” doesn’t need to deliver methods. The
> other example is where we don’t have an object to test for a trait (
> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
> done. That could be expensive so we can use a capability to fail faster.
>
> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin 
> wrote:
>
>> This is currently accomplished by having traits that data sources can
>> extend, as well as runtime exceptions right? It's hard to argue one way 
>> vs
>> another without knowing how things will evolve (e.g. how many different
>> capabilities there will be).
>>
>>
>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I’d like to propose an addition to DataSourceV2 tables, a capability
>>> API. This API would allow Spark to query a table to determine whether it
>>> supports a capability or not:
>>>
>>> val table = catalog.load(identifier)
>>> val supportsContinuous = table.isSupported("continuous-streaming")
>>>
>>> There are a couple of use cases for this. First, we want to be able
>>> to fail fast when a user tries to stream a table that doesn’t support 
>>> it.
>>> The design of our read implementation doesn’t necessarily support this. 
>>> If
>>> we want to share the same “scan” across streaming and batch, then we 
>>> need
>>> to “branch” in the API after that point, but that is at odds with 
>>> failing
>>> fast. We could use capabilities to fail fast and not worry about that
>>> concern in the read design.
>>>
>>> I also want to use capabilities to change the behavior of some
>>> validation rules. The rule that validates appends, for example, doesn’t
>>> allow a write that is missing an optional column. That’s because the
>>> current v1 sources don’t support reading when columns are missing. But
>>> Iceberg does support reading a missing column as nulls, so that users 
>>> 

Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
Do you have an example in mind where we might add a capability and break
old versions of data sources?

These are really for being able to tell what features a data source has. If
there is no way to report a feature (e.g., able to read missing as null)
then there is no way for Spark to take advantage of it in the first place.
For the uses I've proposed, forward compatibility isn't a concern. When we
add a capability, we add handling for it that old versions wouldn't be able
to use anyway. The advantage is that we don't have to treat all sources the
same.

On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:

> How do we deal with forward compatibility? Consider, Spark adds a new
> "property". In the past the data source supports that property, but since
> it was not explicitly defined, in the new version of Spark that data source
> would be considered not supporting that property, and thus throwing an
> exception.
>
>
> On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:
>
>> I'd have two places. First, a class that defines properties supported and
>> identified by Spark, like the SQLConf definitions. Second, in documentation
>> for the v2 table API.
>>
>> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
>> wrote:
>>
>>> One question is where will the list of capability strings be defined?
>>>
>>>
>>> --
>>> *From:* Ryan Blue 
>>> *Sent:* Thursday, November 8, 2018 2:09 PM
>>> *To:* Reynold Xin
>>> *Cc:* Spark Dev List
>>> *Subject:* Re: DataSourceV2 capability API
>>>
>>>
>>> Yes, we currently use traits that have methods. Something like “supports
>>> reading missing columns” doesn’t need to deliver methods. The other example
>>> is where we don’t have an object to test for a trait (
>>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>>> done. That could be expensive so we can use a capability to fail faster.
>>>
>>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:
>>>
 This is currently accomplished by having traits that data sources can
 extend, as well as runtime exceptions right? It's hard to argue one way vs
 another without knowing how things will evolve (e.g. how many different
 capabilities there will be).


 On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
 wrote:

> Hi everyone,
>
> I’d like to propose an addition to DataSourceV2 tables, a capability
> API. This API would allow Spark to query a table to determine whether it
> supports a capability or not:
>
> val table = catalog.load(identifier)
> val supportsContinuous = table.isSupported("continuous-streaming")
>
> There are a couple of use cases for this. First, we want to be able to
> fail fast when a user tries to stream a table that doesn’t support it. The
> design of our read implementation doesn’t necessarily support this. If we
> want to share the same “scan” across streaming and batch, then we need to
> “branch” in the API after that point, but that is at odds with failing
> fast. We could use capabilities to fail fast and not worry about that
> concern in the read design.
>
> I also want to use capabilities to change the behavior of some
> validation rules. The rule that validates appends, for example, doesn’t
> allow a write that is missing an optional column. That’s because the
> current v1 sources don’t support reading when columns are missing. But
> Iceberg does support reading a missing column as nulls, so that users can
> add a column to a table without breaking a scheduled job that populates 
> the
> table. To fix this problem, I would use a table capability, like
> read-missing-columns-as-null.
>
> Any comments on this approach?
>
> rb
> --
> Ryan Blue
> Software Engineer
> Netflix
>

>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
Another solution to the decimal case is using the capability API: use a
capability to signal that the table knows about `supports-decimal`. So
before the decimal support check, it would check
`table.isSupported("type-capabilities")`.

On Fri, Nov 9, 2018 at 12:45 PM Ryan Blue  wrote:

> For that case, I think we would have a property that defines whether
> supports-decimal is assumed or checked with the capability.
>
> Wouldn't we have this problem no matter what the capability API is? If we
> used a trait to signal decimal support, then we would have to deal with
> sources that were written before the trait was introduced. That doesn't
> change the need for some way to signal support for specific capabilities
> like the ones I've suggested.
>
> On Fri, Nov 9, 2018 at 12:38 PM Reynold Xin  wrote:
>
>> "If there is no way to report a feature (e.g., able to read missing as
>> null) then there is no way for Spark to take advantage of it in the first
>> place"
>>
>> Consider this (just a hypothetical scenario): We added "supports-decimal"
>> in the future, because we see a lot of data sources don't support decimal
>> and we want a more graceful error handling. That'd break all existing data
>> sources.
>>
>> You can say we would never add any "existing" features to the feature
>> list in the future, as a requirement for the feature list. But then I'm
>> wondering how much does it really give you, beyond telling data sources to
>> throw exceptions when they don't support a specific operation.
>>
>>
>> On Fri, Nov 9, 2018 at 11:54 AM Ryan Blue  wrote:
>>
>>> Do you have an example in mind where we might add a capability and break
>>> old versions of data sources?
>>>
>>> These are really for being able to tell what features a data source has.
>>> If there is no way to report a feature (e.g., able to read missing as null)
>>> then there is no way for Spark to take advantage of it in the first place.
>>> For the uses I've proposed, forward compatibility isn't a concern. When we
>>> add a capability, we add handling for it that old versions wouldn't be able
>>> to use anyway. The advantage is that we don't have to treat all sources the
>>> same.
>>>
>>> On Fri, Nov 9, 2018 at 11:32 AM Reynold Xin  wrote:
>>>
 How do we deal with forward compatibility? Consider, Spark adds a new
 "property". In the past the data source supports that property, but since
 it was not explicitly defined, in the new version of Spark that data source
 would be considered not supporting that property, and thus throwing an
 exception.


 On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:

> I'd have two places. First, a class that defines properties supported
> and identified by Spark, like the SQLConf definitions. Second, in
> documentation for the v2 table API.
>
> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
> wrote:
>
>> One question is where will the list of capability strings be defined?
>>
>>
>> --
>> *From:* Ryan Blue 
>> *Sent:* Thursday, November 8, 2018 2:09 PM
>> *To:* Reynold Xin
>> *Cc:* Spark Dev List
>> *Subject:* Re: DataSourceV2 capability API
>>
>>
>> Yes, we currently use traits that have methods. Something like
>> “supports reading missing columns” doesn’t need to deliver methods. The
>> other example is where we don’t have an object to test for a trait (
>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>> done. That could be expensive so we can use a capability to fail faster.
>>
>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin 
>> wrote:
>>
>>> This is currently accomplished by having traits that data sources
>>> can extend, as well as runtime exceptions right? It's hard to argue one 
>>> way
>>> vs another without knowing how things will evolve (e.g. how many 
>>> different
>>> capabilities there will be).
>>>
>>>
>>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>>> wrote:
>>>
 Hi everyone,

 I’d like to propose an addition to DataSourceV2 tables, a
 capability API. This API would allow Spark to query a table to 
 determine
 whether it supports a capability or not:

 val table = catalog.load(identifier)
 val supportsContinuous = table.isSupported("continuous-streaming")

 There are a couple of use cases for this. First, we want to be able
 to fail fast when a user tries to stream a table that doesn’t support 
 it.
 The design of our read implementation doesn’t necessarily support 
 this. If
 we want to share the same “scan” across streaming and batch, then we 
 need
 to “branch” in the API after that point, but that is at odds with 
 failing
 fast. We could use capabilities to fail fast and not worry about that
 concern in 

Re: Arrow optimization in conversion from R DataFrame to Spark DataFrame

2018-11-09 Thread Bryan Cutler
Great work Hyukjin!  I'm not too familiar with R, but I'll take a look at
the PR.

Bryan

On Fri, Nov 9, 2018 at 9:19 AM Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Thanks Hyukjin! Very cool results
>
> Shivaram
> On Fri, Nov 9, 2018 at 10:58 AM Felix Cheung 
> wrote:
> >
> > Very cool!
> >
> >
> > 
> > From: Hyukjin Kwon 
> > Sent: Thursday, November 8, 2018 10:29 AM
> > To: dev
> > Subject: Arrow optimization in conversion from R DataFrame to Spark
> DataFrame
> >
> > Hi all,
> >
> > I am trying to introduce R Arrow optimization by reusing PySpark Arrow
> optimization.
> >
> > It boosts R DataFrame > Spark DataFrame up to roughly 900% ~ 1200%
> faster.
> >
> > Looks working fine so far; however, I would appreciate if you guys have
> some time to take a look (https://github.com/apache/spark/pull/22954) so
> that we can directly go ahead as soon as R API of Arrow is released.
> >
> > More importantly, I want some more people who're more into Arrow R API
> side but also interested in Spark side. I have already cc'ed some people I
> know but please come, review and discuss for both Spark side and Arrow side.
> >
> > Thanks.
> >
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


[Structured Streaming] Kafka group.id is fixed

2018-11-09 Thread Anastasios Zouzias
Hi all,

I run in the following situation with Spark Structure Streaming (SS) using
Kafka.

In a project that I work on, there is already a secured Kafka setup where
ops can issue an SSL certificate per "group.id", which should be predefined
(or hopefully its prefix to be predefined).

On the other hand, Spark SS fixes the group.id to

val uniqueGroupId = s"
spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

see, i.e.,

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L124

I guess Spark developers had a good reason to fix it, but is it possible to
make configurable the prefix of the above uniqueGroupId
("spark-kafka-source")? If so, I could prepare a PR on it.

The rational is that we do not want all spark-jobs to use the same
certificate on group-ids of the form (spark-kafka-source-*).


Best regards,
Anastasios Zouzias


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-09 Thread purna pradeep
Thanks this is a great news

Can you please lemme if dynamic resource allocation is available in spark
2.4?

I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor
memory options as part of spark submit command or spark will manage
required executor memory based on the spark job size ?

On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin 
wrote:

> +user@
>
> >> -- Forwarded message -
> >> From: Wenchen Fan 
> >> Date: Thu, Nov 8, 2018 at 10:55 PM
> >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
> >> To: Spark dev list 
> >>
> >>
> >> Hi all,
> >>
> >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
> adds Barrier Execution Mode for better integration with deep learning
> frameworks, introduces 30+ built-in and higher-order functions to deal with
> complex data type easier, improves the K8s integration, along with
> experimental Scala 2.12 support. Other major updates include the built-in
> Avro data source, Image data source, flexible streaming sinks, elimination
> of the 2GB block size limitation during transfer, Pandas UDF improvements.
> In addition, this release continues to focus on usability, stability, and
> polish while resolving around 1100 tickets.
> >>
> >> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
> >>
> >> To download Spark 2.4.0, head over to the download page:
> http://spark.apache.org/downloads.html
> >>
> >> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-0.html
> >>
> >> Thanks,
> >> Wenchen
> >>
> >> PS: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Can Spark avoid Container killed by Yarn?

2018-11-09 Thread Yang Zhang
 I'm always suffering Spark SQL job fails with error "Container exited with
a non-zero exit code 143".
I know that it was casused by the memory used execeeds the limits of
spark.yarn.executor.memoryOverhead. As shown below, memory allocation
request was failed at 18/11/08 17:36:05, then it RECEIVED SIGNAL TERM. Can
spark executor avoid the fate of being destroyed ?


my conf:
--master yarn-client \
--driver-memory 10G \
--executor-memory 10G \
--executor-cores 5 \
--num-executors 12 \
--conf "spark.executor.extraJavaOptions= -XX:MaxPermSize=256M" \
--conf "spark.sql.shuffle.partitions=200" \
--conf "spark.scheduler.mode=FAIR" \
--conf "spark.yarn.executor.memoryOverhead=2048" \


=
18/11/08 17:35:52 INFO [Executor task launch worker for task 13694]
FileScanRDD: Reading File path: hdfs://
phive.smyprd.com:8020/user/hive/warehouse/events/dt=20180103/part-0,
range: 134217728-268435456, partition values: [20180103]
18/11/08 17:35:52 INFO [Executor task launch worker for task 13700]
FileScanRDD: Reading File path: hdfs://
phive.smyprd.com:8020/user/hive/warehouse/events/dt=20180104/part-0,
range: 402653184-536870912, partition values: [20180104]
18/11/08 17:35:52 INFO [Executor task launch worker for task 13688]
FileScanRDD: Reading File path: hdfs://
phive.smyprd.com:8020/user/hive/warehouse/events/dt=20180101/part-0,
range: 134217728-268435456, partition values: [20180101]
18/11/08 17:35:52 INFO [Executor task launch worker for task 13694]
TorrentBroadcast: Started reading broadcast variable 135
18/11/08 17:35:52 INFO [Executor task launch worker for task 13694]
MemoryStore: Block broadcast_135_piece0 stored as bytes in memory
(estimated size 27.2 KB, free 1822.3 MB)
18/11/08 17:35:52 INFO [Executor task launch worker for task 13694]
TorrentBroadcast: Reading broadcast variable 135 took 3 ms
18/11/08 17:35:52 INFO [Executor task launch worker for task 13694]
MemoryStore: Block broadcast_135 stored as values in memory (estimated size
365.6 KB, free 1821.9 MB)
18/11/08 17:36:00 INFO [Executor task launch worker for task 13700]
ShuffleExternalSorter: Thread 1100 spilling sort data of 580.0 MB to disk
(0  time so far)
18/11/08 17:36:03 INFO [Executor task launch worker for task 13688]
ShuffleExternalSorter: Thread 1098 spilling sort data of 580.0 MB to disk
(0  time so far)
18/11/08 17:36:05 WARN [Executor task launch worker for task 13694]
TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
18/11/08 17:36:05 INFO [Executor task launch worker for task 13694]
ShuffleExternalSorter: Thread 1099 spilling sort data of 514.0 MB to disk
(0  time so far)
18/11/08 17:36:05 ERROR [SIGTERM handler] CoarseGrainedExecutorBackend:
RECEIVED SIGNAL TERM
18/11/08 17:36:05 INFO [Thread-2] DiskBlockManager: Shutdown hook called
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Shutdown hook called
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data5/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-e6345a15-d684-440a-a4f7-d23884ee9806
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data9/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-23870d5b-9e6f-4587-bf01-eaf4ea986293
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data7/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-65f184dc-af68-422b-9d2b-e09941ff2679
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data15/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-17488560-736e-4ba4-9ae3-f07e1e33afda
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data4/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-745de0ee-aa39-4cea-b05e-6f924006d4a9
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data6/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-2db274c9-0c45-4e15-ad42-7bce16329b31
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data10/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-6f41703c-e844-4130-9800-1cde62e8bf8c
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data3/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-6eb2ce0e-a4d6-4300-8154-965847e671ef
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data12/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-cd6c6d05-052e-4316-b7ff-342c5cfac817
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory
/data2/yarn/nm/usercache/admin/appcache/application_1527906298937_39699/spark-c702d40f-1997-4742-80ea-30a15c6ec738
18/11/08 17:36:10 INFO [Thread-2] ShutdownHookManager: Deleting directory

Re: Arrow optimization in conversion from R DataFrame to Spark DataFrame

2018-11-09 Thread Felix Cheung
Very cool!



From: Hyukjin Kwon 
Sent: Thursday, November 8, 2018 10:29 AM
To: dev
Subject: Arrow optimization in conversion from R DataFrame to Spark DataFrame

Hi all,

I am trying to introduce R Arrow optimization by reusing PySpark Arrow 
optimization.

It boosts R DataFrame > Spark DataFrame up to roughly 900% ~ 1200% faster.

Looks working fine so far; however, I would appreciate if you guys have some 
time to take a look (https://github.com/apache/spark/pull/22954) so that we can 
directly go ahead as soon as R API of Arrow is released.

More importantly, I want some more people who're more into Arrow R API side but 
also interested in Spark side. I have already cc'ed some people I know but 
please come, review and discuss for both Spark side and Arrow side.

Thanks.



Re: DataSourceV2 capability API

2018-11-09 Thread Felix Cheung
One question is where will the list of capability strings be defined?



From: Ryan Blue 
Sent: Thursday, November 8, 2018 2:09 PM
To: Reynold Xin
Cc: Spark Dev List
Subject: Re: DataSourceV2 capability API


Yes, we currently use traits that have methods. Something like “supports 
reading missing columns” doesn’t need to deliver methods. The other example is 
where we don’t have an object to test for a trait 
(scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown done. 
That could be expensive so we can use a capability to fail faster.

On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin 
mailto:r...@databricks.com>> wrote:
This is currently accomplished by having traits that data sources can extend, 
as well as runtime exceptions right? It's hard to argue one way vs another 
without knowing how things will evolve (e.g. how many different capabilities 
there will be).


On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue  wrote:

Hi everyone,

I’d like to propose an addition to DataSourceV2 tables, a capability API. This 
API would allow Spark to query a table to determine whether it supports a 
capability or not:

val table = catalog.load(identifier)
val supportsContinuous = table.isSupported("continuous-streaming")


There are a couple of use cases for this. First, we want to be able to fail 
fast when a user tries to stream a table that doesn’t support it. The design of 
our read implementation doesn’t necessarily support this. If we want to share 
the same “scan” across streaming and batch, then we need to “branch” in the API 
after that point, but that is at odds with failing fast. We could use 
capabilities to fail fast and not worry about that concern in the read design.

I also want to use capabilities to change the behavior of some validation 
rules. The rule that validates appends, for example, doesn’t allow a write that 
is missing an optional column. That’s because the current v1 sources don’t 
support reading when columns are missing. But Iceberg does support reading a 
missing column as nulls, so that users can add a column to a table without 
breaking a scheduled job that populates the table. To fix this problem, I would 
use a table capability, like read-missing-columns-as-null.

Any comments on this approach?

rb

--
Ryan Blue
Software Engineer
Netflix


--
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 capability API

2018-11-09 Thread Reynold Xin
How do we deal with forward compatibility? Consider, Spark adds a new
"property". In the past the data source supports that property, but since
it was not explicitly defined, in the new version of Spark that data source
would be considered not supporting that property, and thus throwing an
exception.


On Fri, Nov 9, 2018 at 9:11 AM Ryan Blue  wrote:

> I'd have two places. First, a class that defines properties supported and
> identified by Spark, like the SQLConf definitions. Second, in documentation
> for the v2 table API.
>
> On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
> wrote:
>
>> One question is where will the list of capability strings be defined?
>>
>>
>> --
>> *From:* Ryan Blue 
>> *Sent:* Thursday, November 8, 2018 2:09 PM
>> *To:* Reynold Xin
>> *Cc:* Spark Dev List
>> *Subject:* Re: DataSourceV2 capability API
>>
>>
>> Yes, we currently use traits that have methods. Something like “supports
>> reading missing columns” doesn’t need to deliver methods. The other example
>> is where we don’t have an object to test for a trait (
>> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
>> done. That could be expensive so we can use a capability to fail faster.
>>
>> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:
>>
>>> This is currently accomplished by having traits that data sources can
>>> extend, as well as runtime exceptions right? It's hard to argue one way vs
>>> another without knowing how things will evolve (e.g. how many different
>>> capabilities there will be).
>>>
>>>
>>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>>> wrote:
>>>
 Hi everyone,

 I’d like to propose an addition to DataSourceV2 tables, a capability
 API. This API would allow Spark to query a table to determine whether it
 supports a capability or not:

 val table = catalog.load(identifier)
 val supportsContinuous = table.isSupported("continuous-streaming")

 There are a couple of use cases for this. First, we want to be able to
 fail fast when a user tries to stream a table that doesn’t support it. The
 design of our read implementation doesn’t necessarily support this. If we
 want to share the same “scan” across streaming and batch, then we need to
 “branch” in the API after that point, but that is at odds with failing
 fast. We could use capabilities to fail fast and not worry about that
 concern in the read design.

 I also want to use capabilities to change the behavior of some
 validation rules. The rule that validates appends, for example, doesn’t
 allow a write that is missing an optional column. That’s because the
 current v1 sources don’t support reading when columns are missing. But
 Iceberg does support reading a missing column as nulls, so that users can
 add a column to a table without breaking a scheduled job that populates the
 table. To fix this problem, I would use a table capability, like
 read-missing-columns-as-null.

 Any comments on this approach?

 rb
 --
 Ryan Blue
 Software Engineer
 Netflix

>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceV2 capability API

2018-11-09 Thread Ryan Blue
I'd have two places. First, a class that defines properties supported and
identified by Spark, like the SQLConf definitions. Second, in documentation
for the v2 table API.

On Fri, Nov 9, 2018 at 9:00 AM Felix Cheung 
wrote:

> One question is where will the list of capability strings be defined?
>
>
> --
> *From:* Ryan Blue 
> *Sent:* Thursday, November 8, 2018 2:09 PM
> *To:* Reynold Xin
> *Cc:* Spark Dev List
> *Subject:* Re: DataSourceV2 capability API
>
>
> Yes, we currently use traits that have methods. Something like “supports
> reading missing columns” doesn’t need to deliver methods. The other example
> is where we don’t have an object to test for a trait (
> scan.isInstanceOf[SupportsBatch]) until we have a Scan with pushdown
> done. That could be expensive so we can use a capability to fail faster.
>
> On Thu, Nov 8, 2018 at 1:54 PM Reynold Xin  wrote:
>
>> This is currently accomplished by having traits that data sources can
>> extend, as well as runtime exceptions right? It's hard to argue one way vs
>> another without knowing how things will evolve (e.g. how many different
>> capabilities there will be).
>>
>>
>> On Thu, Nov 8, 2018 at 12:50 PM Ryan Blue 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I’d like to propose an addition to DataSourceV2 tables, a capability
>>> API. This API would allow Spark to query a table to determine whether it
>>> supports a capability or not:
>>>
>>> val table = catalog.load(identifier)
>>> val supportsContinuous = table.isSupported("continuous-streaming")
>>>
>>> There are a couple of use cases for this. First, we want to be able to
>>> fail fast when a user tries to stream a table that doesn’t support it. The
>>> design of our read implementation doesn’t necessarily support this. If we
>>> want to share the same “scan” across streaming and batch, then we need to
>>> “branch” in the API after that point, but that is at odds with failing
>>> fast. We could use capabilities to fail fast and not worry about that
>>> concern in the read design.
>>>
>>> I also want to use capabilities to change the behavior of some
>>> validation rules. The rule that validates appends, for example, doesn’t
>>> allow a write that is missing an optional column. That’s because the
>>> current v1 sources don’t support reading when columns are missing. But
>>> Iceberg does support reading a missing column as nulls, so that users can
>>> add a column to a table without breaking a scheduled job that populates the
>>> table. To fix this problem, I would use a table capability, like
>>> read-missing-columns-as-null.
>>>
>>> Any comments on this approach?
>>>
>>> rb
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Behavior of SaveMode.Append when table is not present

2018-11-09 Thread Ryan Blue
Right now, it is up to the source implementation to decide what to do. I
think path-based tables (with no metastore component) treat an append as an
implicit create.

If you're thinking that relying on sources to interpret SaveMode is bad for
consistent behavior, I agree. That's why the community adopted a proposal
to standardize logical plans and the behavior

expected of data sources for the v2 API.

On Thu, Nov 8, 2018 at 11:53 PM Shubham Chaurasia 
wrote:

> Hi,
>
> For SaveMode.Append, the doc
> https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
> says
>
> *When saving a DataFrame to a data source, if data/table already exists,
> contents of the DataFrame are expected to be appended to existing data*
>
> However it does not specify behavior when the table does not exist.
> Does that throw exception or create the table or a NO-OP?
>
> Thanks,
> Shubham
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [Structured Streaming] Kafka group.id is fixed

2018-11-09 Thread Cody Koeninger
That sounds reasonable to me
On Fri, Nov 9, 2018 at 2:26 AM Anastasios Zouzias  wrote:
>
> Hi all,
>
> I run in the following situation with Spark Structure Streaming (SS) using 
> Kafka.
>
> In a project that I work on, there is already a secured Kafka setup where ops 
> can issue an SSL certificate per "group.id", which should be predefined (or 
> hopefully its prefix to be predefined).
>
> On the other hand, Spark SS fixes the group.id to
>
> val uniqueGroupId = 
> s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
>
> see, i.e.,
>
> https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L124
>
> I guess Spark developers had a good reason to fix it, but is it possible to 
> make configurable the prefix of the above uniqueGroupId 
> ("spark-kafka-source")? If so, I could prepare a PR on it.
>
> The rational is that we do not want all spark-jobs to use the same 
> certificate on group-ids of the form (spark-kafka-source-*).
>
>
> Best regards,
> Anastasios Zouzias

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Arrow optimization in conversion from R DataFrame to Spark DataFrame

2018-11-09 Thread Shivaram Venkataraman
Thanks Hyukjin! Very cool results

Shivaram
On Fri, Nov 9, 2018 at 10:58 AM Felix Cheung  wrote:
>
> Very cool!
>
>
> 
> From: Hyukjin Kwon 
> Sent: Thursday, November 8, 2018 10:29 AM
> To: dev
> Subject: Arrow optimization in conversion from R DataFrame to Spark DataFrame
>
> Hi all,
>
> I am trying to introduce R Arrow optimization by reusing PySpark Arrow 
> optimization.
>
> It boosts R DataFrame > Spark DataFrame up to roughly 900% ~ 1200% faster.
>
> Looks working fine so far; however, I would appreciate if you guys have some 
> time to take a look (https://github.com/apache/spark/pull/22954) so that we 
> can directly go ahead as soon as R API of Arrow is released.
>
> More importantly, I want some more people who're more into Arrow R API side 
> but also interested in Spark side. I have already cc'ed some people I know 
> but please come, review and discuss for both Spark side and Arrow side.
>
> Thanks.
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org