[jira] [Commented] (SPARK-23908) High-order function: transform(array, function) → array

2018-07-19 Thread Frederick Reiss (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549628#comment-16549628
 ] 

Frederick Reiss commented on SPARK-23908:
-

Thanks Herman, looking forward to seeing this feature!

> High-order function: transform(array, function) → array
> ---
>
> Key: SPARK-23908
> URL: https://issues.apache.org/jira/browse/SPARK-23908
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Herman van Hovell
>Priority: Major
>
> Ref: https://prestodb.io/docs/current/functions/array.html
> Returns an array that is the result of applying function to each element of 
> array:
> {noformat}
> SELECT transform(ARRAY [], x -> x + 1); -- []
> SELECT transform(ARRAY [5, 6], x -> x + 1); -- [6, 7]
> SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1); -- [6, 1, 7]
> SELECT transform(ARRAY ['x', 'abc', 'z'], x -> x || '0'); -- ['x0', 'abc0', 
> 'z0']
> SELECT transform(ARRAY [ARRAY [1, NULL, 2], ARRAY[3, NULL]], a -> filter(a, x 
> -> x IS NOT NULL)); -- [[1, 2], [3]]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23908) High-order function: transform(array, function) → array

2018-07-18 Thread Frederick Reiss (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548421#comment-16548421
 ] 

Frederick Reiss commented on SPARK-23908:
-

This Jira is marked as "in progress" with the target set to a previous release 
of Spark. Are you working on this, [~hvanhovell]?

> High-order function: transform(array, function) → array
> ---
>
> Key: SPARK-23908
> URL: https://issues.apache.org/jira/browse/SPARK-23908
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Xiao Li
>Assignee: Herman van Hovell
>Priority: Major
>
> Ref: https://prestodb.io/docs/current/functions/array.html
> Returns an array that is the result of applying function to each element of 
> array:
> {noformat}
> SELECT transform(ARRAY [], x -> x + 1); -- []
> SELECT transform(ARRAY [5, 6], x -> x + 1); -- [6, 7]
> SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1); -- [6, 1, 7]
> SELECT transform(ARRAY ['x', 'abc', 'z'], x -> x || '0'); -- ['x0', 'abc0', 
> 'z0']
> SELECT transform(ARRAY [ARRAY [1, NULL, 2], ARRAY[3, NULL]], a -> filter(a, x 
> -> x IS NOT NULL)); -- [[1, 2], [3]]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20964) Make some keywords reserved along with the ANSI/SQL standard

2018-03-21 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408578#comment-16408578
 ] 

Frederick Reiss commented on SPARK-20964:
-

SQL2016 prohibits the use of reserved words in unquoted identifiers. See 
[http://jtc1sc32.org/doc/N2301-2350/32N2311T-text_for_ballot-CD_9075-2.pdf], 
page 176:
 _26) The case-normal form of  shall not be equal, according 
to the comparison rules in Subclause 8.2, “”, to any 
 (with every letter that is a lower-case letter replaced by the 
corresponding upper-case letter or letters), treated as the repetition of a 
 that specifes a  of 
SQL_IDENTIFIER._

That said, strict enforcement of that rule will break existing queries every 
time a new reserved word is added. This factor in turn leads users to start 
quoting *every* identifier as a defensive measure. You end up with SQL like 
this:

{{SELECT "R"."A", "S"."B"}}
{{ FROM "MYSCHEMA"."R" as "R", "MYSCHEMA"."S" as "S"}}

DB2 allows reserved words to be used unquoted; for example, {{SELECT * FROM 
SYSIBM.SYSTABLES WHERE, WHERE}} (see 
[https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_reservedwords.html]),
 which lets users use a more literate form of SQL.

> Make some keywords reserved along with the ANSI/SQL standard
> 
>
> Key: SPARK-20964
> URL: https://issues.apache.org/jira/browse/SPARK-20964
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> The current Spark has many non-reserved words that are essentially reserved 
> in the ANSI/SQL standard 
> (http://developer.mimer.se/validator/sql-reserved-words.tml). 
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4#L709
> This is because there are many datasources (for instance twitter4j) that 
> unfortunately use reserved keywords for column names (See [~hvanhovell]'s 
> comments: https://github.com/apache/spark/pull/18079#discussion_r118842186). 
> We might fix this issue in future major releases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases

2017-06-14 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-21084:

Description: 
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.
Here's a high-level summary of the current planned work:
* [SPARK-21097]: Preserve an executor's cached data when shutting down the 
executor.
* (JIRA TBD): Make Spark give up executors in a controlled fashion when the RM 
indicates it is running low on capacity.
* (JIRA TBD): Reduce the delay for dynamic allocation to spin up new executors.

Note that this overall plan is subject to change, and other members of the 
community should feel free to suggest changes and to help out.

  was:
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.
Here's a high-level summary of the current set of planned enhancements:
* [SPARK-21097]:Preserve an executor's cached data when shutting down the 
executor 

Note that this overall plan is subject to change, and other members of the 
community should feel free to suggest changes and to help out.


> Improvements to dynamic allocation for notebook use cases
> -
>
> Key: SPARK-21084
> URL: https://issues.apache.org/jira/browse/SPARK-21084
> Project: Spark
>  Issue Type: Umbrella
>  Components: Block Manager, Scheduler, Spark Core, YARN
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Frederick Reiss
>
> One important application of Spark is to support many notebook users with a 
> single YARN or Spark Standalone cluster.  We at IBM have seen this 
> requirement across multiple deployments of Spark: on-premises and private 
> cloud deployments at our clients, as well as on the IBM cloud.  The scenario 

[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases

2017-06-14 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-21084:

Description: 
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.
Here's a high-level summary of the current set of planned enhancements:
* [SPARK-21097]:Preserve an executor's cached data when shutting down the 
executor 

Note that this overall plan is subject to change, and other members of the 
community should feel free to suggest changes and to help out.

  was:
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.



> Improvements to dynamic allocation for notebook use cases
> -
>
> Key: SPARK-21084
> URL: https://issues.apache.org/jira/browse/SPARK-21084
> Project: Spark
>  Issue Type: Umbrella
>  Components: Block Manager, Scheduler, Spark Core, YARN
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Frederick Reiss
>
> One important application of Spark is to support many notebook users with a 
> single YARN or Spark Standalone cluster.  We at IBM have seen this 
> requirement across multiple deployments of Spark: on-premises and private 
> cloud deployments at our clients, as well as on the IBM cloud.  The scenario 
> goes something like this: "Every morning at 9am, 500 analysts log into their 
> computers and start running Spark notebooks intermittently for the next 8 
> hours." I'm sure that many other members of the community are interested in 
> making similar scenarios work.
> 
> Dynamic allocation is supposed to support these kinds of use cases by 
> shifting cluster resources towards users who are currently executing scalable 
> code.  In our own testing, we have encountered a 

[jira] [Commented] (SPARK-21084) Improvements to dynamic allocation for notebook use cases

2017-06-14 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049669#comment-16049669
 ] 

Frederick Reiss commented on SPARK-21084:
-

[~sowen] thanks for having a look at this JIRA and giving feedback!

I must confess that, when our product groups first brought these issues to my 
attention, my initial response was similar to yours. Each of the issues 
described above can be fixed *in isolation* by reconfiguring Spark and/or the 
resource manager.  The problem is that every such fix makes the other issues 
worse.  We spent a number of weeks playing best practices whack-a-mole before 
resigning ourselves to making some targeted improvements to Spark itself.

I'll update the description of this JIRA in a moment with a high-level 
description of the Spark changes we're currently looking into.

In the meantime, here's a quick summary of what we ran into while attempting to 
devise a workable configuration of dynamic allocation for notebook users:
Issue #1 (starvation): The obvious fix here is preemption. But there is 
currently no way to preempt an executor gently. The only option is to shut down 
the executor and drop its data, which leads to issues #2 and #4.  Worse, 
Spark's scheduling and cache management are opaque to the resource manager, so 
the RM makes arbitrary choices of which executor to shoot.
Another approach is to configure 
{{spark.dynamicAllocation.cachedExecutorIdleTimeout}} so that notebook sessions 
voluntarily give up executors, even when those executors have cached data. But 
this leads to issues #2 and #4.

Issue #2 (request latency): This issue has two root causes: 
a) It takes a noticeable amount of time to start and ramp up new executors.
b) Spark defers the cost of issue #4 (losing cached data) until a job attempts 
to consume the missing data.
For root cause (a), the obvious solution is to reserve a permanent minimum pool 
of executors for each notebook user by setting the 
{{spark.dynamicAllocation.minExecutors}} parameter to a sufficiently high 
value. But tying down containers in this way leaves fewer resources for other 
users, exacerbating issues #1, #3, and #4. The reserved executors are likely to 
be idle most of the time, because notebook users alternate between running 
Spark jobs, running local computation in the notebook kernel, and looking at 
results in the web browser. 
See issue #4 below for what happens when you try to address root cause (b) with 
config changes.

Issue #3 (unfair allocation of CPU): The obvious fix here is to set 
{{spark.dynamicAllocation.cachedExecutorIdleTimeout}} so that notebook sessions 
voluntarily give up executors, even when those executors have cached data. But 
this leads to issues #2 and #4. One can also reduce the value of 
{{spark.dynamicAllocation.maxExecutors}}, but that puts a cap on the degree of 
parallelism that a given user can access, leading to more of issue #2.

Issue #4 (loss of cached data): The obvious fix here is to set 
{{spark.dynamicAllocation.cachedExecutorIdleTimeout}} to infinity. But then any 
notebook user who has called RDD.cache() at some point in the past will tie 
down a large pool of containers indefinitely, leading to issues #1, #2, and #3. 
If you attempt to limit the size of this large pool by reducing 
{{spark.dynamicAllocation.maxExecutors}}, you limit the peak performance that 
the notebook user can get out of Spark, leading to issue #2.


> Improvements to dynamic allocation for notebook use cases
> -
>
> Key: SPARK-21084
> URL: https://issues.apache.org/jira/browse/SPARK-21084
> Project: Spark
>  Issue Type: Umbrella
>  Components: Block Manager, Scheduler, Spark Core, YARN
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Frederick Reiss
>
> One important application of Spark is to support many notebook users with a 
> single YARN or Spark Standalone cluster.  We at IBM have seen this 
> requirement across multiple deployments of Spark: on-premises and private 
> cloud deployments at our clients, as well as on the IBM cloud.  The scenario 
> goes something like this: "Every morning at 9am, 500 analysts log into their 
> computers and start running Spark notebooks intermittently for the next 8 
> hours." I'm sure that many other members of the community are interested in 
> making similar scenarios work.
> 
> Dynamic allocation is supposed to support these kinds of use cases by 
> shifting cluster resources towards users who are currently executing scalable 
> code.  In our own testing, we have encountered a number of issues with using 
> the current implementation of dynamic allocation for this purpose:
> *Issue #1: Starvation.* A Spark job acquires all available containers, 
> preventing other jobs or applications from starting.
> *Issue #2: Request latency.* Jobs 

[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases

2017-06-13 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-21084:

Description: 
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.


  was:
One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available YARN containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.



> Improvements to dynamic allocation for notebook use cases
> -
>
> Key: SPARK-21084
> URL: https://issues.apache.org/jira/browse/SPARK-21084
> Project: Spark
>  Issue Type: Umbrella
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Frederick Reiss
>
> One important application of Spark is to support many notebook users with a 
> single YARN or Spark Standalone cluster.  We at IBM have seen this 
> requirement across multiple deployments of Spark: on-premises and private 
> cloud deployments at our clients, as well as on the IBM cloud.  The scenario 
> goes something like this: "Every morning at 9am, 500 analysts log into their 
> computers and start running Spark notebooks intermittently for the next 8 
> hours." I'm sure that many other members of the community are interested in 
> making similar scenarios work.
> 
> Dynamic allocation is supposed to support these kinds of use cases by 
> shifting cluster resources towards users who are currently executing scalable 
> code.  In our own testing, we have encountered a number of issues with using 
> the current implementation of dynamic allocation for this purpose:
> *Issue #1: Starvation.* A Spark job acquires all available containers, 
> preventing other jobs or applications from starting.
> *Issue #2: Request latency.* Jobs that would normally finish in less than 30 
> seconds take 2-4x 

[jira] [Created] (SPARK-21084) Improvements to dynamic allocation for notebook use cases

2017-06-13 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-21084:
---

 Summary: Improvements to dynamic allocation for notebook use cases
 Key: SPARK-21084
 URL: https://issues.apache.org/jira/browse/SPARK-21084
 Project: Spark
  Issue Type: Umbrella
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Frederick Reiss


One important application of Spark is to support many notebook users with a 
single YARN or Spark Standalone cluster.  We at IBM have seen this requirement 
across multiple deployments of Spark: on-premises and private cloud deployments 
at our clients, as well as on the IBM cloud.  The scenario goes something like 
this: "Every morning at 9am, 500 analysts log into their computers and start 
running Spark notebooks intermittently for the next 8 hours." I'm sure that 
many other members of the community are interested in making similar scenarios 
work.

Dynamic allocation is supposed to support these kinds of use cases by shifting 
cluster resources towards users who are currently executing scalable code.  In 
our own testing, we have encountered a number of issues with using the current 
implementation of dynamic allocation for this purpose:
*Issue #1: Starvation.* A Spark job acquires all available YARN containers, 
preventing other jobs or applications from starting.
*Issue #2: Request latency.* Jobs that would normally finish in less than 30 
seconds take 2-4x longer than normal with dynamic allocation.
*Issue #3: Unfair resource allocation due to cached data.* Applications that 
have cached RDD partitions hold onto executors indefinitely, denying those 
resources to other applications.
*Issue #4: Loss of cached data leads to thrashing.*  Applications repeatedly 
lose partitions of cached RDDs because the underlying executors are removed; 
the applications then need to rerun expensive computations.

This umbrella JIRA covers efforts to address these issues by making 
enhancements to Spark.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18127) Add hooks and extension points to Spark

2017-04-25 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983872#comment-15983872
 ] 

Frederick Reiss commented on SPARK-18127:
-

Is there a design document or a public design and requirements discussion 
associated with this JIRA?

> Add hooks and extension points to Spark
> ---
>
> Key: SPARK-18127
> URL: https://issues.apache.org/jira/browse/SPARK-18127
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Srinath
>Assignee: Sameer Agarwal
> Fix For: 2.2.0
>
>
> As a Spark user I want to be able to customize my spark session. I currently 
> want to be able to do the following things:
> # I want to be able to add custom analyzer rules. This allows me to implement 
> my own logical constructs; an example of this could be a recursive operator.
> # I want to be able to add my own analysis checks. This allows me to catch 
> problems with spark plans early on. An example of this can be some datasource 
> specific checks.
> # I want to be able to add my own optimizations. This allows me to optimize 
> plans in different ways, for instance when you use a very different cluster 
> (for example a one-node X1 instance). This supersedes the current 
> {{spark.experimental}} methods
> # I want to be able to add my own planning strategies. This supersedes the 
> current {{spark.experimental}} methods. This allows me to plan my own 
> physical plan, an example of this would to plan my own heavily integrated 
> data source (CarbonData for example).
> # I want to be able to use my own customized SQL constructs. An example of 
> this would supporting my own dialect, or be able to add constructs to the 
> current SQL language. I should not have to implement a complete parse, and 
> should be able to delegate to an underlying parser.
> # I want to be able to track modifications and calls to the external catalog. 
> I want this API to be stable. This allows me to do synchronize with other 
> systems.
> This API should modify the SparkSession when the session gets started, and it 
> should NOT change the session in flight.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13534) Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas

2016-10-21 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15596756#comment-15596756
 ] 

Frederick Reiss commented on SPARK-13534:
-

We ([~bryanc], [~holdenk], [~yinxusen], and myself) are looking into this.
Here's a rough outline of the current planned approach:
- Add a dependency on Arrow 0.1's Java and Scala APIs to Spark.
- Add a new developer API method to Dataset, {{collectAsArrow()}}, that returns 
an array of byte arrays, where each byte array contains a block of records in 
Arrow format. The conversion to Arrow will be a streamlined version of the 
Parquet conversion in {{ParquetWriteSupport}} (minus all the callbacks and 
levels of indirection). Conversion of complex types (Struct, Array, Map) to 
Arrow will not be supported in this version.
- modify Pyspark's {{DataFrame.toPandas}} method to use the following logic:
{noformat}
if (the schema of the DataFrame does not contain complex types)
Call collectAsArrow() on the underlying Scala Dataset.
Pull the resulting buffers of Arrow data over to the Python process.
Use Arrow's Python APIs to convert the buffers into a single Pandas 
dataframe.
else
Use the existing code as a slow-path conversion.
{noformat}

> Implement Apache Arrow serializer for Spark DataFrame for use in 
> DataFrame.toPandas
> ---
>
> Key: SPARK-13534
> URL: https://issues.apache.org/jira/browse/SPARK-13534
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Reporter: Wes McKinney
>
> The current code path for accessing Spark DataFrame data in Python using 
> PySpark passes through an inefficient serialization-deserialiation process 
> that I've examined at a high level here: 
> https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] 
> objects are being deserialized in pure Python as a list of tuples, which are 
> then converted to pandas.DataFrame using its {{from_records}} alternate 
> constructor. This also uses a large amount of memory.
> For flat (no nested types) schemas, the Apache Arrow memory layout 
> (https://github.com/apache/arrow/tree/master/format) can be deserialized to 
> {{pandas.DataFrame}} objects with comparatively small overhead compared with 
> memcpy / system memory bandwidth -- Arrow's bitmasks must be examined, 
> replacing the corresponding null values with pandas's sentinel values (None 
> or NaN as appropriate).
> I will be contributing patches to Arrow in the coming weeks for converting 
> between Arrow and pandas in the general case, so if Spark can send Arrow 
> memory to PySpark, we will hopefully be able to increase the Python data 
> access throughput by an order of magnitude or more. I propose to add an new 
> serializer for Spark DataFrame and a new method that can be invoked from 
> PySpark to request a Arrow memory-layout byte stream, prefixed by a data 
> header indicating array buffer offsets and sizes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15777) Catalog federation

2016-10-03 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15542995#comment-15542995
 ] 

Frederick Reiss commented on SPARK-15777:
-

Thanks for posting this design doc. Some comments from a quick read-through:
* I'm having some trouble following the description of optimizer rule 
pluggability in the current design doc. If the user is connected to multiple 
external data sources, will the optimizer rules for all those data sources be 
active all the time, even when running queries that only touch a single data 
source? If there are rules active from more than one external data source, in 
what order will those rules be evaluated? Will the order of evaluation (and 
hence the query plan) change if the user connects to the data sources in a 
different order?  Note that if you could post a link to the WIP implementation 
it would probably clear up my questions here quickly.
* Limitation number (2) seems pretty significant to me.  In a site uses 
multiple external data sources, users will need to include boilerplate at the 
beginning of every application to connect to all of the data sources' catalogs 
explicitly. That situation would be similar to the problem that motivated this 
JIRA in the first place. I think it would be good to include a mechanism for 
storing a persistent "meta-catalog" of external catalogs.
* I don't see a mention of the namespace handling issues that the description 
for this JIRA mentions. With the current design, it looks like two external 
catalogs could define the same table name. Did I miss something, or is the plan 
to push out resolving namespace conflicts to a follow-on JIRA?

> Catalog federation
> --
>
> Key: SPARK-15777
> URL: https://issues.apache.org/jira/browse/SPARK-15777
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Reynold Xin
> Attachments: SparkFederationDesign.pdf
>
>
> This is a ticket to track progress to support federating multiple external 
> catalogs. This would require establishing an API (similar to the current 
> ExternalCatalog API) for getting information about external catalogs, and 
> ability to convert a table into a data source table.
> As part of this, we would also need to be able to support more than a 
> two-level table identifier (database.table). At the very least we would need 
> a three level identifier for tables (catalog.database.table). A possibly 
> direction is to support arbitrary level hierarchical namespaces similar to 
> file systems.
> Once we have this implemented, we can convert the current Hive catalog 
> implementation into an external catalog that is "mounted" into an internal 
> catalog.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10815) API design: data sources and sinks

2016-09-28 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530313#comment-15530313
 ] 

Frederick Reiss commented on SPARK-10815:
-

Thanks for the clarification. Do you have a design to share at this point, or 
were you intending to solicit a design from members of the community?

> API design: data sources and sinks
> --
>
> Key: SPARK-10815
> URL: https://issues.apache.org/jira/browse/SPARK-10815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL, Streaming
>Reporter: Reynold Xin
>
> The existing (in 2.0) source/sink interface for structured streaming depends 
> on RDDs. This dependency has two issues:
> 1. The RDD interface is wide and difficult to stabilize across versions. This 
> is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. 
> Ideally, a source/sink implementation created for Spark 2.x should work in 
> Spark 10.x, assuming the JVM is still around.
> 2. It is difficult to swap in/out a different execution engine.
> The purpose of this ticket is to create a stable interface that addresses the 
> above two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17631) Structured Streaming - Do we need to output results through http API?

2016-09-27 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528403#comment-15528403
 ] 

Frederick Reiss commented on SPARK-17631:
-

I can think of two use cases where one might want to have the option to send 
the output of a Structured Streaming application out over HTTP.

The first use case involves using HTTP as a lightweight protocol for 
transferring bulk data out of a Structured Streaming query to a remote system. 
In this case, I would think the best approach would be to use a 
{{ForeachSink}}. That way, the HTTP connections would originate from the 
executors instead of having all the data pulled into the driver. I'm not sure 
that there's really a need to add a built-in sink type for this use case, as 
the existing {{ForeachSink}} provides most of what one would need.

The second use case involves tunneling data out of the Spark cluster to an 
existing legacy system that speaks HTTP or HTTPS. For example, one might want a 
Structured Streaming application to send ML model updates to an application 
server via a secured connection. In this use case, I would think that it would 
be appropriate for the data to be pulled to the driver as in the PR. However, 
you would want a lot more configurability in any facility built into Spark; 
otherwise, there isn't much benefit over what a user could throw together in a 
few minutes over the (as yet unopened, but hopefully soon available) Sink API.

> Structured Streaming - Do we need to output results through http API?
> -
>
> Key: SPARK-17631
> URL: https://issues.apache.org/jira/browse/SPARK-17631
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Streaming
>Affects Versions: 2.0.0
>Reporter: zhangxinyu
>Priority: Minor
>
> Streaming query results can be sinked to http server through http post request



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16407) Allow users to supply custom StreamSinkProviders

2016-09-20 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15508260#comment-15508260
 ] 

Frederick Reiss commented on SPARK-16407:
-

It looks like there is a general agreement here that Structured Streaming ought 
to have some way to direct the output of a streaming query to the Spark driver. 
But there is a disagreement about what shape that API should take. Can we have 
a discussion on this thread about what would be an acceptable design to meet 
this requirement, or would it be better to move over to the dev list?

> Allow users to supply custom StreamSinkProviders
> 
>
> Key: SPARK-16407
> URL: https://issues.apache.org/jira/browse/SPARK-16407
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: holdenk
>
> The current DataStreamWriter allows users to specify a class name as format, 
> however it could be easier for people to directly pass in a specific provider 
> instance - e.g. for user equivalent of ForeachSink or other sink with 
> non-string parameters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10815) API design: data sources and sinks

2016-09-19 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15505102#comment-15505102
 ] 

Frederick Reiss commented on SPARK-10815:
-

I'm confused by the current description of this task.  As far as I can see, the 
interface for sources and sinks in Structured Streaming has no direct 
dependencies on RDDs. More precisely, the four traits (Source, Sink, 
StreamSourceProvider, and StreamSinkProvider) that directly comprise the 
interface do not depend on the {{RDD}} class. The DataStreamWriter and 
DataFrameWriter class that currently insulate users from Source, Sink, etc. 
also do not have any dependencies on {{RDD}}.

Is this issue intended perhaps to reference limitations of the implementations 
of Datasets that require unnecessary direct access to the Dataset's internal 
RDD? 

> API design: data sources and sinks
> --
>
> Key: SPARK-10815
> URL: https://issues.apache.org/jira/browse/SPARK-10815
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL, Streaming
>Reporter: Reynold Xin
>
> The existing (in 2.0) source/sink interface for structured streaming depends 
> on RDDs. This dependency has two issues:
> 1. The RDD interface is wide and difficult to stabilize across versions. This 
> is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. 
> Ideally, a source/sink implementation created for Spark 2.x should work in 
> Spark 10.x, assuming the JVM is still around.
> 2. It is difficult to swap in/out a different execution engine.
> The purpose of this ticket is to create a stable interface that addresses the 
> above two.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16407) Allow users to supply custom StreamSinkProviders

2016-09-15 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494795#comment-15494795
 ] 

Frederick Reiss commented on SPARK-16407:
-

With respect, I'm not seeing a whole lot of flux in those APIs, or in 
Structured Streaming in general.  SPARK-8360, the JIRA for source and sink API 
design, has no description, no comments, and no changes since November of 2015. 
 The Structured Streaming design doc hasn't changed appreciably since May.  
Even short-term tasks in Structured Streaming are very thin on the ground, and 
PR traffic is minimal. Is something going on behind closed doors that other 
members of the community are not aware of?

> Allow users to supply custom StreamSinkProviders
> 
>
> Key: SPARK-16407
> URL: https://issues.apache.org/jira/browse/SPARK-16407
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: holdenk
>
> The current DataStreamWriter allows users to specify a class name as format, 
> however it could be easier for people to directly pass in a specific provider 
> instance - e.g. for user equivalent of ForeachSink or other sink with 
> non-string parameters.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17543) Missing log4j config file for tests in common/network-shuffle

2016-09-14 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17543:
---

 Summary: Missing log4j config file for tests in 
common/network-shuffle
 Key: SPARK-17543
 URL: https://issues.apache.org/jira/browse/SPARK-17543
 Project: Spark
  Issue Type: Bug
Reporter: Frederick Reiss
Priority: Trivial


*This is a small starter task to help new contributors practice the pull 
request and code review process.*

The Maven module {{common/network-shuffle}} does not have a log4j configuration 
file for its test cases. Usually these configuration files are located inside 
each module, in the directory {{src/test/resources}}. The missing configuration 
file leads to a scary-looking but harmless series of errors and stack traces in 
Spark build logs:
{noformat}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
log4j:ERROR Could not read configuration file from URL 
[file:src/test/resources/log4j.properties].
java.io.FileNotFoundException: src/test/resources/log4j.properties (No such 
file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.(FileInputStream.java:146)
at java.io.FileInputStream.(FileInputStream.java:101)
at 
sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
at 
sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
at 
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
at 
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at org.apache.log4j.Logger.getLogger(Logger.java:104)
at 
io.netty.util.internal.logging.Log4JLoggerFactory.newInstance(Log4JLoggerFactory.java:29)
at 
io.netty.util.internal.logging.InternalLoggerFactory.newDefaultFactory(InternalLoggerFactory.java:46)
at 
io.netty.util.internal.logging.InternalLoggerFactory.(InternalLoggerFactory.java:34)
...
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17542) Compiler warning in UnsafeInMemorySorter class

2016-09-14 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17542:
---

 Summary: Compiler warning in UnsafeInMemorySorter class
 Key: SPARK-17542
 URL: https://issues.apache.org/jira/browse/SPARK-17542
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: Frederick Reiss
Priority: Trivial


*This is a small starter task to help new contributors practice the pull 
request and code review process.*

When building Spark with Java 8, there is a compiler warning in the class 
UnsafeInMemorySorter:
{noformat}
[WARNING] 
.../core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java:284:
 warning: [static] 
static method should be qualified by type name, TaskMemoryManager, instead of
 by an expression
{noformat}

Spark should compile without these kinds of warnings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-09-13 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487780#comment-15487780
 ] 

Frederick Reiss commented on SPARK-15406:
-

+1 for taking the simple route in the short term. I'm seeing a lot of people at 
my work who are just kicking the tires on Structured Streaming at this point. 
Right now, we just need a Kafka connector that is correct and easy to use; 
performance and advanced functionality are of secondary importance. 

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17513) StreamExecution should discard unneeded metadata

2016-09-12 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17513:
---

 Summary: StreamExecution should discard unneeded metadata
 Key: SPARK-17513
 URL: https://issues.apache.org/jira/browse/SPARK-17513
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


The StreamExecution maintains a write-ahead log of batch metadata in order to 
allow repeating previously in-flight batches if the driver is restarted. 
StreamExecution does not garbage-collect or compact this log in any way.

Since the log is implemented with HDFSMetadataLog, these files will consume 
memory on the HDFS NameNode. Specifically, each log file will consume about 300 
bytes of NameNode memory (150 bytes for the inode and 150 bytes for the block 
of file contents; see 
[https://www.cloudera.com/documentation/enterprise/latest/topics/admin_nn_memory_config.html].
 An application with a 100 msec batch interval will increase the NameNode's 
heap usage by about 250MB per day.

There is also the matter of recovery. StreamExecution reads its entire log when 
restarting. This read operation will be very expensive if the log contains 
millions of entries spread over millions of files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17475) HDFSMetadataLog should not leak CRC files

2016-09-09 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17475:

Description: 
When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. 
NFS or the driver node's local filesystem), the class leaves orphan checksum 
(CRC) files in the log directory. The files have names that follow the pattern 
"..[long UUID hex string].tmp.crc". These files exist because HDFSMetaDataLog 
renames other temporary files without renaming the corresponding checksum 
files. There is one CRC file per batch, so the directory fills up quite quickly.

I'm not certain, but this problem might also occur on certain versions of the 
HDFS APIs.

  was:
When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. 
NFS or the driver node's local filesystem), the class leaves orphan checksum 
(CRC) files in the log directory. The files have names that follow the pattern 
"..[long UUID hex string].tmp.crc". These files exist HDFSMetaDataLog renames 
other temporary files without renaming the corresponding checksum files. There 
is one CRC file per batch, so the directory fills up quite quickly.

I'm not certain, but this problem might also occur on certain versions of the 
HDFS APIs.


> HDFSMetadataLog should not leak CRC files
> -
>
> Key: SPARK-17475
> URL: https://issues.apache.org/jira/browse/SPARK-17475
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> When HDFSMetadataLog uses a log directory on a filesystem other than HDFS 
> (i.e. NFS or the driver node's local filesystem), the class leaves orphan 
> checksum (CRC) files in the log directory. The files have names that follow 
> the pattern "..[long UUID hex string].tmp.crc". These files exist because 
> HDFSMetaDataLog renames other temporary files without renaming the 
> corresponding checksum files. There is one CRC file per batch, so the 
> directory fills up quite quickly.
> I'm not certain, but this problem might also occur on certain versions of the 
> HDFS APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17421) Warnings about "MaxPermSize" parameter when building with Maven and Java 8

2016-09-09 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477864#comment-15477864
 ] 

Frederick Reiss commented on SPARK-17421:
-

Committer feedback on first PR was that the necessary build changes to fix this 
issue are too substantial to be appropriate. Created a second PR that only 
documents the compile warnings as a known issue and explains why these warnings 
occur even when the user sets MAVEN_OPTS. 

> Warnings about "MaxPermSize" parameter when building with Maven and Java 8
> --
>
> Key: SPARK-17421
> URL: https://issues.apache.org/jira/browse/SPARK-17421
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Reporter: Frederick Reiss
>Priority: Minor
>
> When building Spark with {{build/mvn}} or {{dev/run-tests}}, a Java warning 
> appears repeatedly on STDERR:
> {{OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support 
> was removed in 8.0}}
> This warning is due to {{build/mvn}} adding the {{-XX:MaxPermSize=512M}} 
> option to {{MAVEN_OPTS}}. When compiling with Java 7, this parameter is 
> essential. With Java 8, the parameter leads to the warning above.
> Because {{build/mvn}} adds {{MaxPermSize}} to {{MAVEN_OPTS}}, even if that 
> environment variable doesn't contain the option, setting {{MAVEN_OPTS}} to a 
> string that does not contain {{MaxPermSize}} has no effect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17475) HDFSMetadataLog should not leak CRC files

2016-09-09 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17475:
---

 Summary: HDFSMetadataLog should not leak CRC files
 Key: SPARK-17475
 URL: https://issues.apache.org/jira/browse/SPARK-17475
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. 
NFS or the driver node's local filesystem), the class leaves orphan checksum 
(CRC) files in the log directory. The files have names that follow the pattern 
"..[long UUID hex string].tmp.crc". These files exist HDFSMetaDataLog renames 
other temporary files without renaming the corresponding checksum files. There 
is one CRC file per batch, so the directory fills up quite quickly.

I'm not certain, but this problem might also occur on certain versions of the 
HDFS APIs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will sit in a loop calling {{getOffset()}} every 10 msec 
(the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new 
data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this rapid polling leads to excessive CPU usage.

In a production environment, this overhead could disrupt critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If no 
data has arrived, Spark will list the directory's contents up to 100 times per 
second. This overhead could disrupt service to other systems using HDFS, 
including Spark itself. A similar situation will exist with the Kafka source, 
the {{getOffset()}} method of which will presumably call Kafka's 
{{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until 
new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If no 
data has arrived, Spark will list the directory's contents up to 100 times per 
second. This overhead could disrupt service to other systems using HDFS, 
including Spark itself. A similar situation will exist with the Kafka source, 
the {{getOffset()}} method of which will presumably call Kafka's 
{{Consumer.poll()}} method.


> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>Priority: Minor
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will sit in a loop calling {{getOffset()}} every 10 msec 
> (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new 
> data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this rapid polling leads to excessive CPU usage.
> In a production environment, this overhead could disrupt critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If no 
> data has arrived, Spark will list the directory's contents up to 100 times 
> per second. This overhead could disrupt service to other systems using HDFS, 
> including Spark itself. A similar situation will exist with the Kafka source, 
> the {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until 
new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If no 
data has arrived, Spark will list the directory's contents up to 100 times per 
second. This overhead could disrupt service to other systems using HDFS, 
including Spark itself. A similar situation will exist with the Kafka source, 
the {{getOffset()}} method of which will presumably call Kafka's 
{{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until 
new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
list an HDFS directory's contents up to 100 times per second. This overhead 
could disrupt service to other systems using HDFS, including Spark itself. A 
similar situation will exist with the Kafka source, the {{getOffset()}} method 
of which will presumably call Kafka's {{Consumer.poll()}} method.


> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>Priority: Minor
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} 
> until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If no 
> data has arrived, Spark will list the directory's contents up to 100 times 
> per second. This overhead could disrupt service to other systems using HDFS, 
> including Spark itself. A similar situation will exist with the Kafka source, 
> the {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until 
new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
list an HDFS directory's contents up to 100 times per second. This overhead 
could disrupt service to other systems using HDFS, including Spark itself. A 
similar situation will exist with the Kafka source, the {{getOffset()}} method 
of which will presumably call Kafka's {{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of on every {{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.


> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>Priority: Minor
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} 
> until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will list an HDFS directory's contents up to 100 times per second. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec (the default value of on every {{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec on every {{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.


> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>Priority: Minor
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec (the default value of on every {{Source}} until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will make hundreds of RPC calls per second to the HDFS NameNode. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Priority: Minor  (was: Major)

> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>Priority: Minor
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec on every {{Source}} until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will make hundreds of RPC calls per second to the HDFS NameNode. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Summary: Default polling and trigger intervals cause excessive RPC calls  
(was: Default trigger interval causes excessive RPC calls)

> Default polling and trigger intervals cause excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec on every {{Source}} until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will make hundreds of RPC calls per second to the HDFS NameNode. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default trigger interval causes excessive RPC calls

2016-09-07 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 
msec on every {{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on every 
{{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.


> Default trigger interval causes excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 
> 10 msec on every {{Source}} until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will make hundreds of RPC calls per second to the HDFS NameNode. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17421) Warnings about "MaxPermSize" parameter when building with Maven and Java 8

2016-09-06 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17421:
---

 Summary: Warnings about "MaxPermSize" parameter when building with 
Maven and Java 8
 Key: SPARK-17421
 URL: https://issues.apache.org/jira/browse/SPARK-17421
 Project: Spark
  Issue Type: Bug
  Components: Build
Reporter: Frederick Reiss
Priority: Minor


When building Spark with {{build/mvn}} or {{dev/run-tests}}, a Java warning 
appears repeatedly on STDERR:
{{OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support 
was removed in 8.0}}

This warning is due to {{build/mvn}} adding the {{-XX:MaxPermSize=512M}} option 
to {{MAVEN_OPTS}}. When compiling with Java 7, this parameter is essential. 
With Java 8, the parameter leads to the warning above.

Because {{build/mvn}} adds {{MaxPermSize}} to {{MAVEN_OPTS}}, even if that 
environment variable doesn't contain the option, setting {{MAVEN_OPTS}} to a 
string that does not contain {{MaxPermSize}} has no effect.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17386) Default trigger interval causes excessive RPC calls

2016-09-02 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-17386:

Description: 
The default trigger interval for a Structured Streaming query is 
{{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
When the trigger is set to this default value, the scheduler in 
{{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on every 
{{Source}} until new data arrives.

In test cases, where most of the sources are {{MemoryStream}} or 
{{TextSocketSource}}, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or 
the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
{{FileStreamSource}} performs a directory listing of an HDFS directory. If the 
scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will 
make hundreds of RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the {{getOffset()}} method of which 
will presumably call Kafka's {{Consumer.poll()}} method.

  was:
The default trigger interval for a Structured Streaming query is 
`ProcessingTime(0)`, i.e. "trigger new microbatches as fast as possible". When 
the trigger is set to this default value, the scheduler in `StreamExecution` 
will spin in a tight loop calling `getOffset()` on every `Source` until new 
data arrives.

In test cases, where most of the sources are `MemoryStream` or 
`TextSocketSource`, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be `FileStreamSource` or 
the not-yet-written Kafka 0.10 Source. The `getOffset()` method of 
`FileStreamSource` performs a directory listing of an HDFS directory. If the 
scheduler calls `FileStreamSource.getOffset()` in a tight loop, Spark will make 
several hundred RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the `getOffset()` method of which 
will presumably call Kafka's `Consumer.poll()` method.


> Default trigger interval causes excessive RPC calls
> ---
>
> Key: SPARK-17386
> URL: https://issues.apache.org/jira/browse/SPARK-17386
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Frederick Reiss
>
> The default trigger interval for a Structured Streaming query is 
> {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". 
> When the trigger is set to this default value, the scheduler in 
> {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on 
> every {{Source}} until new data arrives.
> In test cases, where most of the sources are {{MemoryStream}} or 
> {{TextSocketSource}}, this spinning leads to excessive CPU usage.
> In a production environment, this spinning could take down critical 
> infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} 
> or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of 
> {{FileStreamSource}} performs a directory listing of an HDFS directory. If 
> the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark 
> will make hundreds of RPC calls per second to the HDFS NameNode. This 
> overhead could disrupt service to other systems using HDFS, including Spark 
> itself. A similar situation will exist with the Kafka source, the 
> {{getOffset()}} method of which will presumably call Kafka's 
> {{Consumer.poll()}} method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17386) Default trigger interval causes excessive RPC calls

2016-09-02 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17386:
---

 Summary: Default trigger interval causes excessive RPC calls
 Key: SPARK-17386
 URL: https://issues.apache.org/jira/browse/SPARK-17386
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Reporter: Frederick Reiss


The default trigger interval for a Structured Streaming query is 
`ProcessingTime(0)`, i.e. "trigger new microbatches as fast as possible". When 
the trigger is set to this default value, the scheduler in `StreamExecution` 
will spin in a tight loop calling `getOffset()` on every `Source` until new 
data arrives.

In test cases, where most of the sources are `MemoryStream` or 
`TextSocketSource`, this spinning leads to excessive CPU usage.

In a production environment, this spinning could take down critical 
infrastructure. Most sources in Spark clusters will be `FileStreamSource` or 
the not-yet-written Kafka 0.10 Source. The `getOffset()` method of 
`FileStreamSource` performs a directory listing of an HDFS directory. If the 
scheduler calls `FileStreamSource.getOffset()` in a tight loop, Spark will make 
several hundred RPC calls per second to the HDFS NameNode. This overhead could 
disrupt service to other systems using HDFS, including Spark itself. A similar 
situation will exist with the Kafka source, the `getOffset()` method of which 
will presumably call Kafka's `Consumer.poll()` method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13534) Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas

2016-09-01 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456990#comment-15456990
 ] 

Frederick Reiss commented on SPARK-13534:
-

[~wesmckinn], are you planning to work on this issue soon?

> Implement Apache Arrow serializer for Spark DataFrame for use in 
> DataFrame.toPandas
> ---
>
> Key: SPARK-13534
> URL: https://issues.apache.org/jira/browse/SPARK-13534
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Reporter: Wes McKinney
>
> The current code path for accessing Spark DataFrame data in Python using 
> PySpark passes through an inefficient serialization-deserialiation process 
> that I've examined at a high level here: 
> https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] 
> objects are being deserialized in pure Python as a list of tuples, which are 
> then converted to pandas.DataFrame using its {{from_records}} alternate 
> constructor. This also uses a large amount of memory.
> For flat (no nested types) schemas, the Apache Arrow memory layout 
> (https://github.com/apache/arrow/tree/master/format) can be deserialized to 
> {{pandas.DataFrame}} objects with comparatively small overhead compared with 
> memcpy / system memory bandwidth -- Arrow's bitmasks must be examined, 
> replacing the corresponding null values with pandas's sentinel values (None 
> or NaN as appropriate).
> I will be contributing patches to Arrow in the coming weeks for converting 
> between Arrow and pandas in the general case, so if Spark can send Arrow 
> memory to PySpark, we will hopefully be able to increase the Python data 
> access throughput by an order of magnitude or more. I propose to add an new 
> serializer for Spark DataFrame and a new method that can be invoked from 
> PySpark to request a Arrow memory-layout byte stream, prefixed by a data 
> header indicating array buffer offsets and sizes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17343) Prerequisites for Kafka 0.8 support in Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss closed SPARK-17343.
---
Resolution: Won't Fix

There will be no Kafka 0.8 connectors for Structured Streaming. See comments on 
SPARK-15406.

> Prerequisites for Kafka 0.8 support in Structured Streaming
> ---
>
> Key: SPARK-17343
> URL: https://issues.apache.org/jira/browse/SPARK-17343
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> This issue covers any API changes, refactoring, and utility classes/methods 
> that are necessary to make it possible to implement support for Kafka 0.8 
> sources and sinks in Structured Streaming.
> From a quick glance, it looks like some refactoring of the existing state 
> storage mechanism in the Kafka 0.8 DStream may suffice. But there might be 
> some additional groundwork in other areas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss closed SPARK-17344.
---
Resolution: Won't Fix

There will be no Kafka 0.8 connectors for Structured Streaming. See comments on 
SPARK-15406.

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15454320#comment-15454320
 ] 

Frederick Reiss commented on SPARK-15406:
-

Thanks for clearing that up. Marking SPARK-17343 and SPARK-17344 as "won't fix".

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453749#comment-15453749
 ] 

Frederick Reiss commented on SPARK-15406:
-

WRT Kafka 0.8: I'm under the impression that there is a significant number of 
Spark users who are still stuck with Kafka 0.8 or 0.9. Kafka sits between 
multiple systems, so upgrades to production Kafka installations can be 
challenging. Are other people monitoring this JIRA seeing a similar situation, 
or are versions before 0.10 not in widespread use any more? If older Kafka 
releases aren't relevant, then we should probably deprecate the entire 
spark-streaming-kafka-0-8 component.

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453059#comment-15453059
 ] 

Frederick Reiss commented on SPARK-15406:
-

I've taken the liberty of splitting this issue into several steps to facilitate 
collaboration.

[~c...@koeninger.org], would you mind typing up any thoughts you have about how 
these steps should move forward, as well as if there are smaller subtasks that 
others can help out with?

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka

2016-08-31 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453047#comment-15453047
 ] 

Frederick Reiss commented on SPARK-15406:
-

There has been some discussion of this JIRA on the mailing list; see 
[http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Kafka-sources-sinks-tt18645.html]

> Structured streaming support for consuming from Kafka
> -
>
> Key: SPARK-15406
> URL: https://issues.apache.org/jira/browse/SPARK-15406
> Project: Spark
>  Issue Type: New Feature
>Reporter: Cody Koeninger
>
> Structured streaming doesn't have support for kafka yet.  I personally feel 
> like time based indexing would make for a much better interface, but it's 
> been pushed back to kafka 0.10.1
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17346) Kafka 0.10 support in Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17346:
---

 Summary: Kafka 0.10 support in Structured Streaming
 Key: SPARK-17346
 URL: https://issues.apache.org/jira/browse/SPARK-17346
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


Implement Kafka 0.10-based sources and sinks for Structured Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17345) Prerequisites for Kafka 0.10 support in Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17345:
---

 Summary: Prerequisites for Kafka 0.10 support in Structured 
Streaming
 Key: SPARK-17345
 URL: https://issues.apache.org/jira/browse/SPARK-17345
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


This issue covers any API changes, refactoring, and utility classes/methods 
that are necessary to make it possible to implement support for Kafka 0.10 
sources and sinks in Structured Streaming.

At a minimum, the changes in SPARK-16963 will be needed in order for the Kafka 
commit protocol to work. Given that KIP-33 ("Add a time based log index") is 
not yet in place, it may be necessary to make additional API changes in Spark 
for commit to work efficiently. Some refactoring of the existing KafkaRDD class 
is probably in order also.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17344:
---

 Summary: Kafka 0.8 support for Structured Streaming
 Key: SPARK-17344
 URL: https://issues.apache.org/jira/browse/SPARK-17344
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


Design and implement Kafka 0.8-based sources and sinks for Structured Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17343) Prerequisites for Kafka 0.8 support in Structured Streaming

2016-08-31 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17343:
---

 Summary: Prerequisites for Kafka 0.8 support in Structured 
Streaming
 Key: SPARK-17343
 URL: https://issues.apache.org/jira/browse/SPARK-17343
 Project: Spark
  Issue Type: Sub-task
  Components: Streaming
Reporter: Frederick Reiss


This issue covers any API changes, refactoring, and utility classes/methods 
that are necessary to make it possible to implement support for Kafka 0.8 
sources and sinks in Structured Streaming.

>From a quick glance, it looks like some refactoring of the existing state 
>storage mechanism in the Kafka 0.8 DStream may suffice. But there might be 
>some additional groundwork in other areas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17267) Long running structured streaming requirements

2016-08-31 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15452993#comment-15452993
 ] 

Frederick Reiss commented on SPARK-17267:
-

Converted SPARK-16963 to a subtask of this JIRA.

> Long running structured streaming requirements
> --
>
> Key: SPARK-17267
> URL: https://issues.apache.org/jira/browse/SPARK-17267
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Reporter: Reynold Xin
>Priority: Blocker
>
> This is an umbrella ticket to track various things that are required in order 
> to have the engine for structured streaming run non-stop in production.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state

2016-08-31 Thread Frederick Reiss (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frederick Reiss updated SPARK-16963:

Issue Type: Sub-task  (was: Improvement)
Parent: SPARK-17267

> Change Source API so that sources do not need to keep unbounded state
> -
>
> Key: SPARK-16963
> URL: https://issues.apache.org/jira/browse/SPARK-16963
> Project: Spark
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Frederick Reiss
>
> The version of the Source API in Spark 2.0.0 defines a single getBatch() 
> method for fetching records from the source, with the following Scaladoc 
> comments defining the semantics:
> {noformat}
> /**
>  * Returns the data that is between the offsets (`start`, `end`]. When 
> `start` is `None` then
>  * the batch should begin with the first available record. This method must 
> always return the
>  * same data for a particular `start` and `end` pair.
>  */
> def getBatch(start: Option[Offset], end: Offset): DataFrame
> {noformat}
> These semantics mean that a Source must retain all past history for the 
> stream that it backs. Further, a Source is also required to retain this data 
> across restarts of the process where the Source is instantiated, even when 
> the Source is restarted on a different machine.
> These restrictions make it difficult to implement the Source API, as any 
> implementation requires potentially unbounded amounts of distributed storage.
> See the mailing list thread at 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html]
>  for more information.
> This JIRA will cover augmenting the Source API with an additional callback 
> that will allow Structured Streaming scheduler to notify the source when it 
> is safe to discard buffered data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17303) dev/run-tests fails if spark-warehouse directory exists

2016-08-29 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-17303:
---

 Summary: dev/run-tests fails if spark-warehouse directory exists
 Key: SPARK-17303
 URL: https://issues.apache.org/jira/browse/SPARK-17303
 Project: Spark
  Issue Type: Bug
  Components: Build
Reporter: Frederick Reiss
Priority: Minor


The script dev/run-tests, which is intended for verifying the correctness of 
pull requests, runs Apache RAT to check for missing Apache license headers. 
Later, the script does a full compile/package/test sequence.

The script as currently written works fine the first time. But the second time 
it runs, the Apache RAT checks fail due to the presence of the directory 
spark-warehouse, which the script indirectly creates during its regression test 
run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state

2016-08-26 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440598#comment-15440598
 ] 

Frederick Reiss commented on SPARK-16963:
-

Updated the pull request to address some conflicting changes in the main branch 
and to address some minor review comments. Changed the name of `getMinOffset` 
to `lastCommittedOffset` per Prashant's comments. Changes are still ready for 
review.

> Change Source API so that sources do not need to keep unbounded state
> -
>
> Key: SPARK-16963
> URL: https://issues.apache.org/jira/browse/SPARK-16963
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Frederick Reiss
>
> The version of the Source API in Spark 2.0.0 defines a single getBatch() 
> method for fetching records from the source, with the following Scaladoc 
> comments defining the semantics:
> {noformat}
> /**
>  * Returns the data that is between the offsets (`start`, `end`]. When 
> `start` is `None` then
>  * the batch should begin with the first available record. This method must 
> always return the
>  * same data for a particular `start` and `end` pair.
>  */
> def getBatch(start: Option[Offset], end: Offset): DataFrame
> {noformat}
> These semantics mean that a Source must retain all past history for the 
> stream that it backs. Further, a Source is also required to retain this data 
> across restarts of the process where the Source is instantiated, even when 
> the Source is restarted on a different machine.
> These restrictions make it difficult to implement the Source API, as any 
> implementation requires potentially unbounded amounts of distributed storage.
> See the mailing list thread at 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html]
>  for more information.
> This JIRA will cover augmenting the Source API with an additional callback 
> that will allow Structured Streaming scheduler to notify the source when it 
> is safe to discard buffered data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17165) FileStreamSource should not track the list of seen files indefinitely

2016-08-26 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439391#comment-15439391
 ] 

Frederick Reiss commented on SPARK-17165:
-

This problem is actually deeper than just FileStreamSource. With the current 
version of the Source trait, *every* source needs to keep infinite state. 
[~scrapco...@gmail.com] ran into that issue while writing a connector for MQTT. 
I opened SPARK-16963 a few weeks back to cover the core issue with the Stream 
trait. My open PR for that JIRA (https://github.com/apache/spark/pull/14553) 
has a fair amount of overlap with the PR here and with the one in SPARK-17235.

Can we merge our efforts here to make a single sequence of small, 
easy-to-review change sets that will resolve these state management issues 
across all sources? I'm thinking that we can create a single JIRA (or reuse one 
of the existing ones) to cover "keep only bounded state for Structured 
Streaming data sources", then divide that JIRA into the following tasks:
# Add a method to `Source` to trigger cleaning of processed data
# Add a method to `HDFSMetadataLog` to clean out processed metadata
# Implement garbage collection of old data (metadata and files) in 
`FileStreamSource`
# Implement garbage collection of old data in `MemoryStream` and other stubs of 
Source
# Modify the scheduler (`StreamExecution`) so that it triggers garbage 
collection of data and metadata

Thoughts?

> FileStreamSource should not track the list of seen files indefinitely
> -
>
> Key: SPARK-17165
> URL: https://issues.apache.org/jira/browse/SPARK-17165
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Streaming
>Reporter: Reynold Xin
>
> FileStreamSource currently tracks all the files seen indefinitely, which 
> means it can run out of memory or overflow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state

2016-08-22 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431779#comment-15431779
 ] 

Frederick Reiss commented on SPARK-16963:
-

The proposed changes in the attached PR are now ready for review. [~marmbrus] 
can you please have a look at your convenience? [~prashant_] can you also 
please have a look with a particular focus on whether the changes fit with the 
MQTT connector?

> Change Source API so that sources do not need to keep unbounded state
> -
>
> Key: SPARK-16963
> URL: https://issues.apache.org/jira/browse/SPARK-16963
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 2.0.0
>Reporter: Frederick Reiss
>
> The version of the Source API in Spark 2.0.0 defines a single getBatch() 
> method for fetching records from the source, with the following Scaladoc 
> comments defining the semantics:
> {noformat}
> /**
>  * Returns the data that is between the offsets (`start`, `end`]. When 
> `start` is `None` then
>  * the batch should begin with the first available record. This method must 
> always return the
>  * same data for a particular `start` and `end` pair.
>  */
> def getBatch(start: Option[Offset], end: Offset): DataFrame
> {noformat}
> These semantics mean that a Source must retain all past history for the 
> stream that it backs. Further, a Source is also required to retain this data 
> across restarts of the process where the Source is instantiated, even when 
> the Source is restarted on a different machine.
> These restrictions make it difficult to implement the Source API, as any 
> implementation requires potentially unbounded amounts of distributed storage.
> See the mailing list thread at 
> [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html]
>  for more information.
> This JIRA will cover augmenting the Source API with an additional callback 
> that will allow Structured Streaming scheduler to notify the source when it 
> is safe to discard buffered data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state

2016-08-08 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-16963:
---

 Summary: Change Source API so that sources do not need to keep 
unbounded state
 Key: SPARK-16963
 URL: https://issues.apache.org/jira/browse/SPARK-16963
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 2.0.0
Reporter: Frederick Reiss


The version of the Source API in Spark 2.0.0 defines a single getBatch() method 
for fetching records from the source, with the following Scaladoc comments 
defining the semantics:

{noformat}
/**
 * Returns the data that is between the offsets (`start`, `end`]. When `start` 
is `None` then
 * the batch should begin with the first available record. This method must 
always return the
 * same data for a particular `start` and `end` pair.
 */
def getBatch(start: Option[Offset], end: Offset): DataFrame
{noformat}
These semantics mean that a Source must retain all past history for the stream 
that it backs. Further, a Source is also required to retain this data across 
restarts of the process where the Source is instantiated, even when the Source 
is restarted on a different machine.
These restrictions make it difficult to implement the Source API, as any 
implementation requires potentially unbounded amounts of distributed storage.
See the mailing list thread at 
[http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html]
 for more information.
This JIRA will cover augmenting the Source API with an additional callback that 
will allow Structured Streaming scheduler to notify the source when it is safe 
to discard buffered data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15370) Some correlated subqueries return incorrect answers

2016-05-20 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15293496#comment-15293496
 ] 

Frederick Reiss commented on SPARK-15370:
-

[~hvanhovell]'s comments on the first PR pointed out some additional cases that 
this fix needs to cover. Thanks for the help! I'm running regression tests on a 
second change set with additional code to cover these other cases.

> Some correlated subqueries return incorrect answers
> ---
>
> Key: SPARK-15370
> URL: https://issues.apache.org/jira/browse/SPARK-15370
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Frederick Reiss
>
> The rewrite introduced in SPARK-14785 has the COUNT bug. The rewrite changes 
> the semantics of some correlated subqueries when there are tuples from the 
> outer query block that do not join with the subquery. For example:
> {noformat}
> spark-sql> create table R(a integer) as values (1);
> spark-sql> create table S(b integer);
> spark-sql> select R.a from R 
>  > where (select count(*) from S where R.a = S.b) = 0;
> Time taken: 2.139 seconds 
>   
> spark-sql> 
> (returns zero rows; the answer should be one row of '1')
> {noformat}
> This problem also affects the SELECT clause:
> {noformat}
> spark-sql> select R.a, 
>  > (select count(*) from S where R.a = S.b) as cnt 
>  > from R;
> 1 NULL
> (the answer should be "1 0")
> {noformat}
> Some subqueries with COUNT aggregates are *not* affected:
> {noformat}
> spark-sql> select R.a from R 
>  > where (select count(*) from S where R.a = S.b) > 0;
> Time taken: 0.609 seconds
> spark-sql>
> (Correct answer)
> spark-sql> select R.a from R 
>  > where (select count(*) + sum(S.b) from S where R.a = S.b) = 0;
> Time taken: 0.553 seconds
> spark-sql> 
> (Correct answer)
> {noformat}
> Other cases can trigger the variant of the COUNT bug for expressions 
> involving NULL checks:
> {noformat}
> spark-sql> select R.a from R 
>  > where (select sum(S.b) is null from S where R.a = S.b);
> (returns zero rows, should return one row)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15372) TPC-DS Qury 84 returns wrong results against TPC official

2016-05-17 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288004#comment-15288004
 ] 

Frederick Reiss commented on SPARK-15372:
-

There is no CONCAT function in the SQL standard. The concatenation operator in 
the standard is ||. DB2 treats "CONCAT" as a synonym for "||"; other vendors do 
different things. The standard states that if either input to the concatenation 
operator is NULL, then the output is NULL (Section 6.28  in the SQL/Foundations document).

The TPC-DS benchmark spec doesn't say anything about the allowable semantics of 
CONCAT, only that "vendor specific syntax can be used".

> TPC-DS Qury 84 returns wrong results against TPC official
> -
>
> Key: SPARK-15372
> URL: https://issues.apache.org/jira/browse/SPARK-15372
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: JESSE CHEN
>Assignee: Herman van Hovell
>Priority: Critical
>  Labels: SPARK-15071
> Fix For: 2.0.0
>
>
> The official TPC-DS query 84 returns wrong results when compared to its 
> official answer set.
> The query itself is:
> {noformat}
>   select  c_customer_id as customer_id
>,concat(c_last_name , ', ' , c_first_name) as customername
>  from customer
>  ,customer_address
>  ,customer_demographics
>  ,household_demographics
>  ,income_band
>  ,store_returns
>  where ca_city  =  'Edgewood'
>and c_current_addr_sk = ca_address_sk
>and ib_lower_bound   >=  38128
>and ib_upper_bound   <=  38128 + 5
>and ib_income_band_sk = hd_income_band_sk
>and cd_demo_sk = c_current_cdemo_sk
>and hd_demo_sk = c_current_hdemo_sk
>and sr_cdemo_sk = cd_demo_sk
>  order by c_customer_id
>   limit 100;
> {noformat}
> Spark 2.0 build 0517 returned the following result:
> {noformat}
> AIPG  Carter, Rodney
> AKMBBAAA  Mcarthur, Emma
> CBNHBAAA  Wells, Ron
> DBME  Vera, Tina
> DBME  Vera, Tina
> DHKGBAAA  Scott, Pamela
> EIIBBAAA  Atkins, Susan
> FKAH  Batiste, Ernest
> GHMA  Mitchell, Gregory
> IAODBAAA  Murray, Karen
> IEOK  Solomon, Clyde
> IIBO  Owens, David
> IPDC  Wallace, Eric
> IPIM  Hayward, Benjamin
> JCIK  Ramos, Donald
> KFJE  Roberts, Yvonne
> KPGBBAAA  NULL < ??? questionable row
> LCLABAAA  Whitaker, Lettie
> MGME  Sharp, Michael
> MIGBBAAA  Montgomery, Jesenia
> MPDK  Lopez, Isabel
> NEOM  Powell, Linda
> NKPC  Shaffer, Sergio
> NOCK  Vargas, James
> OGJEBAAA  Owens, Denice
> {noformat}
> Official answer set (which is correct!)
> {noformat}
> AIPG Carter  , Rodney
> AKMBBAAA Mcarthur, Emma
> CBNHBAAA Wells   , Ron
> DBME Vera, Tina
> DBME Vera, Tina
> DHKGBAAA Scott   , Pamela
> EIIBBAAA Atkins  , Susan
> FKAH Batiste , Ernest
> GHMA Mitchell, Gregory
> IAODBAAA Murray  , Karen
> IEOK Solomon , Clyde
> IIBO Owens   , David
> IPDC Wallace , Eric
> IPIM Hayward , Benjamin
> JCIK Ramos   , Donald
> KFJE Roberts , Yvonne
> KPGBBAAA Moore   ,
> LCLABAAA Whitaker, Lettie
> MGME Sharp   , Michael
> MIGBBAAA Montgomery  , Jesenia
> MPDK Lopez   , Isabel
> NEOM Powell  , Linda
> NKPC Shaffer , Sergio
> NOCK Vargas  , James
> OGJEBAAA Owens   , Denice
> {noformat}
> The issue is with the "concat" function in Spark SQL (also behaves the same 
> in Hive). When 'concat' meets any NULL string, it returns NULL as the answer. 
> But is this right? When I concatenate a person's last name and first name, if 
> the first name is missing (empty string or NULL), I should see the last name 
> still, not NULL, i.e., "Smith" + "" = "Smith", not NULL. 
> Simplest 

[jira] [Created] (SPARK-15370) Some correlated subqueries return incorrect answers

2016-05-17 Thread Frederick Reiss (JIRA)
Frederick Reiss created SPARK-15370:
---

 Summary: Some correlated subqueries return incorrect answers
 Key: SPARK-15370
 URL: https://issues.apache.org/jira/browse/SPARK-15370
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0
Reporter: Frederick Reiss


The rewrite introduced in SPARK-14785 has the COUNT bug. The rewrite changes 
the semantics of some correlated subqueries when there are tuples from the 
outer query block that do not join with the subquery. For example:
{noformat}
spark-sql> create table R(a integer) as values (1);
spark-sql> create table S(b integer);
spark-sql> select R.a from R 
 > where (select count(*) from S where R.a = S.b) = 0;
Time taken: 2.139 seconds   
spark-sql> 
(returns zero rows; the answer should be one row of '1')
{noformat}
This problem also affects the SELECT clause:
{noformat}
spark-sql> select R.a, 
 > (select count(*) from S where R.a = S.b) as cnt 
 > from R;
1   NULL
(the answer should be "1 0")
{noformat}
Some subqueries with COUNT aggregates are *not* affected:
{noformat}
spark-sql> select R.a from R 
 > where (select count(*) from S where R.a = S.b) > 0;
Time taken: 0.609 seconds
spark-sql>
(Correct answer)

spark-sql> select R.a from R 
 > where (select count(*) + sum(S.b) from S where R.a = S.b) = 0;
Time taken: 0.553 seconds
spark-sql> 
(Correct answer)
{noformat}

Other cases can trigger the variant of the COUNT bug for expressions involving 
NULL checks:
{noformat}
spark-sql> select R.a from R 
 > where (select sum(S.b) is null from S where R.a = S.b);
(returns zero rows, should return one row)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15122) TPC-DS Qury 41 fails with The correlated scalar subquery can only contain equality predicates

2016-05-04 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15271874#comment-15271874
 ] 

Frederick Reiss commented on SPARK-15122:
-

In the official version of the query, the expression {{i_manufact = 
i1.i_manufact}} appears twice: once on either side of an {{OR}}. The optimizer 
needs to normalize the expression enough to factor that subexpression out of 
the two sides of the disjunction. Also, the error checking code in 
{{CheckAnalysis.scala}} that triggers the problem needs to trigger *after* that 
normalization. It looks like that check happens before the call to 
{{Optimizer.execute}}.

> TPC-DS Qury 41 fails with The correlated scalar subquery can only contain 
> equality predicates
> -
>
> Key: SPARK-15122
> URL: https://issues.apache.org/jira/browse/SPARK-15122
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: JESSE CHEN
>Priority: Critical
>
> The official TPC-DS query 41 fails with the following error:
> {noformat}
> Error in query: The correlated scalar subquery can only contain equality 
> predicates: (((i_manufact#38 = i_manufact#16) && (i_category#36 = Women) 
> && ((i_color#41 = powder) || (i_color#41 = khaki))) && (((i_units#42 = Ounce) 
> || (i_units#42 = Oz)) && ((i_size#39 = medium) || (i_size#39 = extra 
> large || (((i_category#36 = Women) && ((i_color#41 = brown) || 
> (i_color#41 = honeydew))) && (((i_units#42 = Bunch) || (i_units#42 = Ton)) && 
> ((i_size#39 = N/A) || (i_size#39 = small) || i_category#36 = Men) && 
> ((i_color#41 = floral) || (i_color#41 = deep))) && (((i_units#42 = N/A) || 
> (i_units#42 = Dozen)) && ((i_size#39 = petite) || (i_size#39 = large || 
> (((i_category#36 = Men) && ((i_color#41 = light) || (i_color#41 = 
> cornflower))) && (((i_units#42 = Box) || (i_units#42 = Pound)) && ((i_size#39 
> = medium) || (i_size#39 = extra large))) || ((i_manufact#38 = 
> i_manufact#16) && (i_category#36 = Women) && ((i_color#41 = midnight) || 
> (i_color#41 = snow))) && (((i_units#42 = Pallet) || (i_units#42 = Gross)) && 
> ((i_size#39 = medium) || (i_size#39 = extra large || (((i_category#36 = 
> Women) && ((i_color#41 = cyan) || (i_color#41 = papaya))) && (((i_units#42 = 
> Cup) || (i_units#42 = Dram)) && ((i_size#39 = N/A) || (i_size#39 = small) 
> || i_category#36 = Men) && ((i_color#41 = orange) || (i_color#41 = 
> frosted))) && (((i_units#42 = Each) || (i_units#42 = Tbl)) && ((i_size#39 = 
> petite) || (i_size#39 = large || (((i_category#36 = Men) && ((i_color#41 
> = forest) || (i_color#41 = ghost))) && (((i_units#42 = Lb) || (i_units#42 = 
> Bundle)) && ((i_size#39 = medium) || (i_size#39 = extra large;
> {noformat}
> The output plans showed the following errors
> {noformat}
> == Parsed Logical Plan ==
> 'GlobalLimit 100
> +- 'LocalLimit 100
>+- 'Sort ['i_product_name ASC], true
>   +- 'Distinct
>  +- 'Project ['i_product_name]
> +- 'Filter ((('i_manufact_id >= 738) && ('i_manufact_id <= (738 + 
> 40))) && (scalar-subquery#1 [] > 0))
>:  +- 'SubqueryAlias scalar-subquery#1 []
>: +- 'Project ['count(1) AS item_cnt#0]
>:+- 'Filter ((('i_manufact = 'i1.i_manufact) && 
> ('i_category = Women) && (('i_color = powder) || ('i_color = khaki))) && 
> ((('i_units = Ounce) || ('i_units = Oz)) && (('i_size = medium) || ('i_size = 
> extra large || ((('i_category = Women) && (('i_color = brown) || 
> ('i_color = honeydew))) && ((('i_units = Bunch) || ('i_units = Ton)) && 
> (('i_size = N/A) || ('i_size = small) || 'i_category = Men) && 
> (('i_color = floral) || ('i_color = deep))) && ((('i_units = N/A) || 
> ('i_units = Dozen)) && (('i_size = petite) || ('i_size = large || 
> ((('i_category = Men) && (('i_color = light) || ('i_color = cornflower))) && 
> ((('i_units = Box) || ('i_units = Pound)) && (('i_size = medium) || ('i_size 
> = extra large))) || (('i_manufact = 'i1.i_manufact) && ('i_category = 
> Women) && (('i_color = midnight) || ('i_color = snow))) && ((('i_units = 
> Pallet) || ('i_units = Gross)) && (('i_size = medium) || ('i_size = extra 
> large || ((('i_category = Women) && (('i_color = cyan) || ('i_color = 
> papaya))) && ((('i_units = Cup) || ('i_units = Dram)) && (('i_size = N/A) || 
> ('i_size = small) || 'i_category = Men) && (('i_color = orange) || 
> ('i_color = frosted))) && ((('i_units = Each) || ('i_units = Tbl)) && 
> (('i_size = petite) || ('i_size = large || ((('i_category = Men) && 
> (('i_color = forest) || ('i_color = ghost))) && ((('i_units = Lb) || 
> ('i_units = Bundle)) && (('i_size = medium) || ('i_size = extra large
> 

[jira] [Commented] (SPARK-14781) Support subquery in nested predicates

2016-04-29 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15265000#comment-15265000
 ] 

Frederick Reiss commented on SPARK-14781:
-

Yeah, Distinct will impact performance for the uncorrelated case if the 
subquery returns more than a few million rows. That problem won't occur in the 
particular case of TPC-DS query 45 (the subquery there returns at most 500k 
rows at a 100TB scale factor), but you never know. And of course a Distinct 
after the join, as one would need to cover EXISTS, would see potentially 
billions of rows. I just figured I'd mention that possibility as an expedient 
that doesn't require any additional operators.

I'd be up to adding a "LeftSemiPlus" mode to the various join operators if 
you'd prefer for implementation to start with that step. The new behavior is 
almost the same as the existing LeftSemi mode: one additional output column in 
the schema, plus code to emit rows with a null value when nothing on the inner 
matches an outer tuple.

> Support subquery in nested predicates
> -
>
> Key: SPARK-14781
> URL: https://issues.apache.org/jira/browse/SPARK-14781
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Davies Liu
>
> Right now, we does not support nested IN/EXISTS subquery, for example 
> EXISTS( x1) OR EXISTS( x2)
> In order to do that, we could use an internal-only join type SemiPlus, which 
> will output every row from left, plus additional column as the result of join 
> condition. Then we could replace the EXISTS() or IN() by the result column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14781) Support subquery in nested predicates

2016-04-29 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264789#comment-15264789
 ] 

Frederick Reiss commented on SPARK-14781:
-

[~davies]: I think I have a minimally-invasive plan for covering Q45.

*Existing code:* {{RewritePredicateSubquery.apply}} applies to conjunctions of 
predicates in the WHERE clause. When this rule finds an IN predicate with a 
subquery, the rule rewrites the IN predicate into a join.

*Proposed change:* Modify {{RewritePredicateSubquery}} so that it also detects 
disjunctions (ORs) where exactly one child of the disjunction is an IN 
predicate with a non-correlated subquery. Rewrite each such disjunction into a 
left outer join, followed by a Filter. The inner (right) operand of the left 
outer join should be the subquery with an additional Distinct operator above 
it. The Filter will apply the remaining predicates from the disjunction to any 
tuples that did not join with the subquery.

*Notes:*
The Distinct here is needed because the in-list could contain duplicates. The 
Distinct could be eliminated if there was a join operator that combined the 
behavior of LeftOuter and LeftSemijoin. I suppose that's what SemiPlus will do?

This approach could be extended to cover correlated IN/EXISTS subqueries. The 
rewrite would need to add unique IDs to the outer query's tuples before the 
join + filter, then remove duplicates after the join + filter. I'm *not* 
planning to do this extension in the first pass.

The approach could also be extended to cover multiple subqueries inside a 
disjunction by chaining together multiple outer joins. I'm *not* planning to do 
this extension in the first pass.

*Questions:*
* Do you foresee any problems with this approach?
* There is a second version of the IN/EXISTS subquery rewrite logic in PR 
#12720, but that code hasn't been merged yet. Would you prefer a diff against 
the current head; or a diff against the logic in PR 12720?

> Support subquery in nested predicates
> -
>
> Key: SPARK-14781
> URL: https://issues.apache.org/jira/browse/SPARK-14781
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Davies Liu
>
> Right now, we does not support nested IN/EXISTS subquery, for example 
> EXISTS( x1) OR EXISTS( x2)
> In order to do that, we could use an internal-only join type SemiPlus, which 
> will output every row from left, plus additional column as the result of join 
> condition. Then we could replace the EXISTS() or IN() by the result column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14781) Support subquery in nested predicates

2016-04-27 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261474#comment-15261474
 ] 

Frederick Reiss commented on SPARK-14781:
-

Sure, I'd be happy to put something together to cover Q45. Tomorrow is a mess, 
but I'll have time on Friday and Monday. 

> Support subquery in nested predicates
> -
>
> Key: SPARK-14781
> URL: https://issues.apache.org/jira/browse/SPARK-14781
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Davies Liu
>
> Right now, we does not support nested IN/EXISTS subquery, for example 
> EXISTS( x1) OR EXISTS( x2)
> In order to do that, we could use an internal-only join type SemiPlus, which 
> will output every row from left, plus additional column as the result of join 
> condition. Then we could replace the EXISTS() or IN() by the result column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14785) Support correlated scalar subquery

2016-04-27 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261157#comment-15261157
 ] 

Frederick Reiss commented on SPARK-14785:
-

Note that the rewritten query in the example above needs an additional filter 
on {{t.b > t3.AVG}}. 

This rewrite is described in [a 1982 
paper|https://pdfs.semanticscholar.org/e8ac/c9f63559a09d608e62c4061520c24d970c31.pdf].
 
This rewrite does not always give correct query results. In particular, the 
rewritten query may be missing results if the subquery contains a {{COUNT}} 
aggregate, or if the subquery sometimes returns {{NULL}} and the filtering 
predicate returns {{TRUE}} on a {{NULL}} input. See the chapter "Query Rewrite 
Optimization Rules in IBM DB2 Universal Database" in _Readings in Database 
Systems_ for more information.

For the purpose of covering TPC-DS, this rewrite should work for queries 6, 32, 
81, and 92. Those queries only use {{AVG}} aggregates in their subqueries.

> Support correlated scalar subquery
> --
>
> Key: SPARK-14785
> URL: https://issues.apache.org/jira/browse/SPARK-14785
> Project: Spark
>  Issue Type: New Feature
>Reporter: Davies Liu
>
> For example:
> {code}
> SELECT a from t where b > (select avg(c) from t2 where t.id = t2.id)
> {code}
> it could be rewritten as 
> {code}
> SELECT a FROM t JOIN (SELECT id, AVG(c) FROM t2 GROUP by id) t3 ON t3.id = 
> t.id
> {code}
> TPCDS Q92, Q81, Q6 required this



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14781) Support subquery in nested predicates

2016-04-27 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261032#comment-15261032
 ] 

Frederick Reiss commented on SPARK-14781:
-

[~davies] where is the definition of the SemiPlus operator? I don't see that 
anywhere in my copy of the main trunk.

> Support subquery in nested predicates
> -
>
> Key: SPARK-14781
> URL: https://issues.apache.org/jira/browse/SPARK-14781
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Davies Liu
>
> Right now, we does not support nested IN/EXISTS subquery, for example 
> EXISTS( x1) OR EXISTS( x2)
> In order to do that, we could use an internal-only join type SemiPlus, which 
> will output every row from left, plus additional column as the result of join 
> condition. Then we could replace the EXISTS() or IN() by the result column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14781) Support subquery in nested predicates

2016-04-27 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261028#comment-15261028
 ] 

Frederick Reiss commented on SPARK-14781:
-

I'm not so sure about Q45. Here's the template for Q45:

{noformat}
[_LIMITA] select [_LIMITB] ca_zip, [GBOBC], sum(ws_sales_price)
 from web_sales, customer, customer_address, date_dim, item
 where ws_bill_customer_sk = c_customer_sk
and c_current_addr_sk = ca_address_sk 
and ws_item_sk = i_item_sk 
and ( substr(ca_zip,1,5) in ('85669', '86197','88274','83405','86475', 
'85392', '85460', '80348', '81792')
  or 
  i_item_id in (select i_item_id
 from item
 where i_item_sk in (2, 3, 5, 7, 11, 13, 17, 19, 
23, 29)
 )
)
and ws_sold_date_sk = d_date_sk
and d_qoy = [QOY] and d_year = [YEAR]
 group by ca_zip, [GBOBC]
 order by ca_zip, [GBOBC]
 [_LIMITC];
{noformat}
This query does contain a subquery inside a disjunction ({{...or i_item_id in 
(select...}}), but that subquery is not correlated. What is needed there is for 
that subquery to be added to the list of noncorrelated subqueries evaluated in 
{{SparkPlan.waitForSubqueries()}} and a placeholder for those query results 
inserted into the plan.

Q10 and Q35 have correlated EXISTS subqueries inside disjunctions.

> Support subquery in nested predicates
> -
>
> Key: SPARK-14781
> URL: https://issues.apache.org/jira/browse/SPARK-14781
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Davies Liu
>
> Right now, we does not support nested IN/EXISTS subquery, for example 
> EXISTS( x1) OR EXISTS( x2)
> In order to do that, we could use an internal-only join type SemiPlus, which 
> will output every row from left, plus additional column as the result of join 
> condition. Then we could replace the EXISTS() or IN() by the result column.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10936) UDAF "mode" for categorical variables

2015-10-08 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949306#comment-14949306
 ] 

Frederick Reiss commented on SPARK-10936:
-

Mode is not an algebraic aggregate.  To find the mode in a single pass over the 
original data, one needs to track the full set of distinct values in the 
underlying set, as well as the number of times each value occurs in the records 
seen so far. For high-cardinality columns, this requirement will result in 
unbounded state.

I can see three ways forward here: 

a) Stuff a hash table into the PartialAggregate2 API's result buffer, and hope 
that this buffer does not exhaust the heap, produce O(n^2) behavior when the 
column cardinality is high, or stop working on future (or present?) versions of 
codegen.

b) Implement an approximate mode with fixed-size intermediate state (for 
example, a compressed reservoir sample), similar to how the current 
HyperLogLog++ aggregate works. Approximate computation of the mode will work 
well most of the time but will have unbounded error in corner cases.

c) Add mode as another member of the "distinct" family of Spark aggregates, 
such as SUM/COUNT/AVERAGE DISTINCT. Use the same pre-Tungsten style of 
processing to handle mode for now.  Create a follow-on JIRA to move mode over 
to the fast path at the same time that the other DISTINCT aggregates switch 
over.

I think that (c) is the best option overall, but I'm happy to defer to others 
with deeper understanding. My thinking is that, while it would be good to have 
a mode aggregate available, mode is a relatively uncommon use case. Slow-path 
processing for mode is ok as a short-term expedient. Once SUM DISTINCT and 
related aggregates are fully moved onto the new framework, transitioning mode 
to the fast path should be easy.


> UDAF "mode" for categorical variables
> -
>
> Key: SPARK-10936
> URL: https://issues.apache.org/jira/browse/SPARK-10936
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML, SQL
>Reporter: Xiangrui Meng
>
> This is similar to frequent items except that we don't have a threshold on 
> the frequency. So an exact implementation might require a global shuffle.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6649) DataFrame created through SQLContext.jdbc() failed if columns table must be quoted

2015-05-12 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540967#comment-14540967
 ] 

Frederick Reiss commented on SPARK-6649:


Did some more digging through the code and edit history.

It looks like the current version of the SQL parser uses the backtick character 
(`) as a delimiter. This change first appeared in SPARK-3483, with additional 
fixes done as part of SPARK-6898. I don't see any explanation for the use of a 
nonstandard quote character, and there doesn't seem to be any relevant 
discussion in the mailing list archives.

I've only found two test cases under org.apache.spark.sql and child packages 
that use double quotes to delimit a string literal.

In SQLQuerySuite.scala:
{noformat}
test(date row) {
checkAnswer(sql(
  select cast(2015-01-28 as date) from testData limit 1),
...
{noformat}

And in TableScanSuite.scala:
{noformat}
  test(SPARK-5196 schema field with comment) {
sql(
  
   |CREATE TEMPORARY TABLE student(name string comment SN, age int 
comment SA, grade int)
   |USING org.apache.spark.sql.sources.AllDataTypesScanSource
   |OPTIONS (
   |  from '1',
   |  to '10'
   |)
...
{noformat}

All the other test cases use single quotes per the SQL standard.

I'm going to assume that the use of double quotes to delimit string literals 
was an oversight and make changes to correct that oversight.

 DataFrame created through SQLContext.jdbc() failed if columns table must be 
 quoted
 --

 Key: SPARK-6649
 URL: https://issues.apache.org/jira/browse/SPARK-6649
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.3.0
Reporter: Frédéric Blanc
Priority: Minor

 If I want to import the content a table from oracle, that contains a column 
 with name COMMENT (a reserved keyword), I cannot use a DataFrame that map all 
 the columns of this table.
 {code:title=ddl.sql|borderStyle=solid}
 CREATE TABLE TEST_TABLE (
 COMMENT VARCHAR2(10)
 );
 {code}
 {code:title=test.java|borderStyle=solid}
 SQLContext sqlContext = ...
 DataFrame df = sqlContext.jdbc(databaseURL, TEST_TABLE);
 df.rdd();   // = failed if the table contains a column with a reserved 
 keyword
 {code}
 The same problem can be encounter if reserved keyword are used on table name.
 The JDBCRDD scala class could be improved, if the columnList initializer 
 append the double-quote for each column. (line : 225)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6649) DataFrame created through SQLContext.jdbc() failed if columns table must be quoted

2015-05-04 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527738#comment-14527738
 ] 

Frederick Reiss commented on SPARK-6649:


I was able to reproduce this problem on Spark 1.3.1. The Spark SQL lexical 
analyzer treats anything enclosed in double quotes as a string. The Spark SQL 
parser only allows strings to be treated as literals, not identifiers.

On the spark-sql command line:
{noformat}
spark-sql select hello as hello, 'world' as world;
hello   world
Time taken: 0.125 seconds, Fetched 1 row(s)

spark-sql select hello as hello, 'world' as world;
15/05/04 18:03:05 ERROR SparkSQLDriver: Failed in [select hello as hello, 
'world' as world]
org.apache.spark.sql.AnalysisException: cannot recognize input near 'as' 
'hello' ',' in selection target; line 1 pos 18
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:254)
[many lines of stack trace]
{noformat}

The same thing happens from the Spark Scala shell:
{noformat}
scala val df = sqlContext.sql(select \hello\ as hello, 'world' as world)
df: org.apache.spark.sql.DataFrame = [hello: string, world: string]

scala val df2 = sqlContext.sql(select \hello\ as \hello\, 'world' as 
\world\)
org.apache.spark.sql.AnalysisException: cannot recognize input near 'as' 
'hello' ',' in selection target; line 1 pos 18
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:254)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
[many lines of stack trace]
{noformat}

This behavior is not consistent with the SQL standard, though I suppose it is 
somewhat consistent with MySQL's default behavior.

According to the grammar in the SQL-92 document 
([http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt], strings should be 
delimited by single quotes:
{noformat}
character string literal ::=
  [ introducercharacter set specification ]
  quote [ character representation... ] quote
[ { separator... quote [ character representation... ] 
quote }... ]
...
national character string literal ::=
  N quote [ character representation... ] quote
[ { separator... quote [ character representation... ] 
quote }... ]

bit string literal ::=
  B quote [ bit... ] quote
[ { separator... quote [ bit... ] quote }... ]

hex string literal ::=
  X quote [ hexit... ] quote
[ { separator... quote [ hexit... ] quote }... ]
...
quote ::= '
{noformat}
and identifiers *may* be delimited with double quotes:
{noformat}
delimited identifier ::=
  double quote delimited identifier body double quote
...
double quote ::= 
{noformat}

Thoughts? Are there any pull requests in flight that fix this problem already?

 DataFrame created through SQLContext.jdbc() failed if columns table must be 
 quoted
 --

 Key: SPARK-6649
 URL: https://issues.apache.org/jira/browse/SPARK-6649
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.3.0
Reporter: Frédéric Blanc
Priority: Minor

 If I want to import the content a table from oracle, that contains a column 
 with name COMMENT (a reserved keyword), I cannot use a DataFrame that map all 
 the columns of this table.
 {code:title=ddl.sql|borderStyle=solid}
 CREATE TABLE TEST_TABLE (
 COMMENT VARCHAR2(10)
 );
 {code}
 {code:title=test.java|borderStyle=solid}
 SQLContext sqlContext = ...
 DataFrame df = sqlContext.jdbc(databaseURL, TEST_TABLE);
 df.rdd();   // = failed if the table contains a column with a reserved 
 keyword
 {code}
 The same problem can be encounter if reserved keyword are used on table name.
 The JDBCRDD scala class could be improved, if the columnList initializer 
 append the double-quote for each column. (line : 225)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7043) KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line

2015-05-04 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527683#comment-14527683
 ] 

Frederick Reiss commented on SPARK-7043:


Both issues appear to have the same root cause: The interpreter's rewrites 
create a class with a transient reference to the Spark context, and Kryo 
creates a second context during deserialization.

 KryoSerializer cannot be used with REPL to interpret code in which case class 
 definition and its shipping are in the same line
 --

 Key: SPARK-7043
 URL: https://issues.apache.org/jira/browse/SPARK-7043
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell
Affects Versions: 1.3.1
 Environment: Ubuntu 14.04, no hadoop
Reporter: Peng Cheng
Priority: Minor
  Labels: classloader, kryo
   Original Estimate: 48h
  Remaining Estimate: 48h

 When deploying Spark-shell with
 spark.serializer=org.apache.spark.serializer.KryoSerializer option. 
 Spark-shell cannot execute the following code (in 1 line):
 case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 
 10).collect()
 This problem won't exist for either JavaSerializer or code splitted into 2 
 lines. The only possible explanation is that KryoSerializer is using a 
 ClassLoader that is not registered as an subsidiary ClassLoader of the one in 
 REPL.
 A dirty fix would be just breaking input by semicolon, but its better to 
 fix the ClassLoader to avoid other liabilities.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-7043) KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line

2015-05-04 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527575#comment-14527575
 ] 

Frederick Reiss commented on SPARK-7043:


I did some looking into this issue. Here's a summary of what I've found:

The interpreter rewrites each command into a pair of generated classes, one for 
the read phase and one for the eval phase of executing the statement.

For the command:
{{case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 
10).collect()}}
this rewrite process produces the following class (simplified here by removing 
empty constructors) to cover the read phase:

{noformat}
class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL10 = $line3.$read.INSTANCE;
import $VAL10.$iw.$iw.sc;
class $iwC extends Serializable {
  case class Foo extends scala.Product with scala.Serializable {
caseaccessor paramaccessor val i: Int = _;
  };
  val ret = sc.parallelize(1.to(100).map(Foo), 10).collect()
};
val $iw = new $iwC()
  };
  val $iw = new $iwC()
}
{noformat}

Note the lines
{{val $VAL10 = $line3.$read.INSTANCE;}}
{{import $VAL10.$iw.$iw.sc;}}
These lines pull the symbol {{sc}} into the scope of the class {{$read.$iwC}}. 
They also happen to give the class {{$read.$iwC}} a field of type 
{{$line3.$read.$iwC}}. I haven't been able to get at the definition of the 
generated class or object {{$line3}}, but it appears that {{$line3}} is a 
rewritten version of the Scala code
{noformat}
@transient val sc = {
  val _sc = 
org.apache.spark.repl.Main.interp.createSparkContext()
  println(Spark context available as sc.)
  _sc
}
{noformat}
which runs whenever the interpreter starts up.

The generated code for the current line gets further bundled inside a generated 
object or class (I'm not sure which) that represents the current line's state 
transitions. In the trace I'm looking at, this object/class is called 
{{$line19}}.

As a result, the case class {{Foo}} from the command line turns into the case 
class {{$line19.$read.$iwC.$iwC.Foo}}, which the Scala compiler turns into a 
Java inner class by the same name.

The call to {{sc.parallelize(1.to(100).map(Foo), 10)}} causes Spark to 
serialize an array of instances of {{$line19.$read.$iwC.$iwC.Foo}}. The 
serializer serializes each of these inner class objects, along with their 
parent class objects. During this serialization process, the serializer skips 
the pointer to the SparkContext, since that pointer is marked as transient.

Inside of the call to {{collect()}}, the deserializer reconstructs instances of 
{{$line19.$read.$iwC.$iwC.Foo}}. Before creating such an instance, the 
deserializer needs to create an enclosing instance of 
{{$line19.$read.$iwC.$iwC}}, which needs an enclosing instance of 
{{$line19.$read.$iwC}}. The class {{$line19.$read.$iwC}} contains a field 
({{val $VAL10 = $line3.$read.INSTANCE}}) that has a pointer to a field of an 
object of type {{$line3.$read.$iwC}}. The type {{$line3.$read.$iwC}} in turn 
has a transient field called sc somewhere underneath it.

At some point during initializing the object of type {{$line3.$read.$iwC}}, the 
Kryo deserializer makes a call to the static initializer for the transient 
field sc. This static initializer calls 
{{org.apache.spark.repl.Main.interp.createSparkContext()}}, which attempts and 
fails to create a second SparkContext. The deserialization process fails with 
an exception.

I'm not sure exactly why the Java deserializer doesn't crash in the same way. I 
suspect that, when deserializing the transient field sc, Java's deserializer 
sets that field to null and moves on.

I'm not sure which way of initializing the transient field is correct. The 
serialization spec 
([http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serialTOC.html])
 is vague about default values for transient fields.


 KryoSerializer cannot be used with REPL to interpret code in which case class 
 definition and its shipping are in the same line
 --

 Key: SPARK-7043
 URL: https://issues.apache.org/jira/browse/SPARK-7043
 Project: Spark
  Issue Type: Bug
  Components: Spark Shell
Affects Versions: 1.3.1
 Environment: Ubuntu 14.04, no hadoop
Reporter: Peng Cheng
Priority: Minor
  Labels: classloader, kryo
   Original Estimate: 48h
  Remaining Estimate: 48h

 When deploying Spark-shell with
 spark.serializer=org.apache.spark.serializer.KryoSerializer option. 
 Spark-shell cannot execute the following code (in 1 line):
 case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 
 10).collect()
 

[jira] [Commented] (SPARK-7273) The SQLContext.jsonFile() api has a problem when load a format json file?

2015-05-01 Thread Frederick Reiss (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523384#comment-14523384
 ] 

Frederick Reiss commented on SPARK-7273:


The error in the description indicates that there is a character in the middle 
of the first line of the JSON file that TextInputFormat treats as a line 
separator. Spark sees the JSON content as 

I can think of two potential causes:
a) Steven's JSON content has run through a pretty-printing function, and there 
is a newline character between the two parts of the JSON object, or
b) Steven's local Hadoop/YARN configuration has a nonstandard setting for 
textinputformat.record.delimiter

[~jiege]: Can you share a copy of your JSON file?

Technical details:
SQLContext.jsonFile() makes a call to org.apache.spark.sql.json.DefaultSource, 
which delegates the task to org.apache.spark.sql.json.JSONRelation, which uses 
SparkContext.textFile() to open the JSON file. SparkContext.textFile() uses 
TextInputFormat to read the file.

 The SQLContext.jsonFile() api has a problem when load a format json file?
 -

 Key: SPARK-7273
 URL: https://issues.apache.org/jira/browse/SPARK-7273
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.3.1
Reporter: steven
Priority: Minor

 my code as follow:
  val df = sqlContext.jsonFile(test.json);
 test.json content is:
  { name: steven,
 age : 20
 }
 the jsonFile invoke will get a Exception as follow:
   java.lang.RuntimeException: Failed to parse record age : 20}. 
 Please make sure that each line of the file (or each string in the RDD) is a 
 valid JSON object or an array of JSON objects.
   at scala.sys.package$.error(package.scala:27)
   at 
 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:313)
   at 
 org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:307)
 is it a bug?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org