[jira] [Commented] (SPARK-23908) High-order function: transform(array, function) → array
[ https://issues.apache.org/jira/browse/SPARK-23908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549628#comment-16549628 ] Frederick Reiss commented on SPARK-23908: - Thanks Herman, looking forward to seeing this feature! > High-order function: transform(array, function) → array > --- > > Key: SPARK-23908 > URL: https://issues.apache.org/jira/browse/SPARK-23908 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Herman van Hovell >Priority: Major > > Ref: https://prestodb.io/docs/current/functions/array.html > Returns an array that is the result of applying function to each element of > array: > {noformat} > SELECT transform(ARRAY [], x -> x + 1); -- [] > SELECT transform(ARRAY [5, 6], x -> x + 1); -- [6, 7] > SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1); -- [6, 1, 7] > SELECT transform(ARRAY ['x', 'abc', 'z'], x -> x || '0'); -- ['x0', 'abc0', > 'z0'] > SELECT transform(ARRAY [ARRAY [1, NULL, 2], ARRAY[3, NULL]], a -> filter(a, x > -> x IS NOT NULL)); -- [[1, 2], [3]] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23908) High-order function: transform(array, function) → array
[ https://issues.apache.org/jira/browse/SPARK-23908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548421#comment-16548421 ] Frederick Reiss commented on SPARK-23908: - This Jira is marked as "in progress" with the target set to a previous release of Spark. Are you working on this, [~hvanhovell]? > High-order function: transform(array, function) → array > --- > > Key: SPARK-23908 > URL: https://issues.apache.org/jira/browse/SPARK-23908 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Xiao Li >Assignee: Herman van Hovell >Priority: Major > > Ref: https://prestodb.io/docs/current/functions/array.html > Returns an array that is the result of applying function to each element of > array: > {noformat} > SELECT transform(ARRAY [], x -> x + 1); -- [] > SELECT transform(ARRAY [5, 6], x -> x + 1); -- [6, 7] > SELECT transform(ARRAY [5, NULL, 6], x -> COALESCE(x, 0) + 1); -- [6, 1, 7] > SELECT transform(ARRAY ['x', 'abc', 'z'], x -> x || '0'); -- ['x0', 'abc0', > 'z0'] > SELECT transform(ARRAY [ARRAY [1, NULL, 2], ARRAY[3, NULL]], a -> filter(a, x > -> x IS NOT NULL)); -- [[1, 2], [3]] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20964) Make some keywords reserved along with the ANSI/SQL standard
[ https://issues.apache.org/jira/browse/SPARK-20964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408578#comment-16408578 ] Frederick Reiss commented on SPARK-20964: - SQL2016 prohibits the use of reserved words in unquoted identifiers. See [http://jtc1sc32.org/doc/N2301-2350/32N2311T-text_for_ballot-CD_9075-2.pdf], page 176: _26) The case-normal form of shall not be equal, according to the comparison rules in Subclause 8.2, “”, to any (with every letter that is a lower-case letter replaced by the corresponding upper-case letter or letters), treated as the repetition of a that specifes a of SQL_IDENTIFIER._ That said, strict enforcement of that rule will break existing queries every time a new reserved word is added. This factor in turn leads users to start quoting *every* identifier as a defensive measure. You end up with SQL like this: {{SELECT "R"."A", "S"."B"}} {{ FROM "MYSCHEMA"."R" as "R", "MYSCHEMA"."S" as "S"}} DB2 allows reserved words to be used unquoted; for example, {{SELECT * FROM SYSIBM.SYSTABLES WHERE, WHERE}} (see [https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_reservedwords.html]), which lets users use a more literate form of SQL. > Make some keywords reserved along with the ANSI/SQL standard > > > Key: SPARK-20964 > URL: https://issues.apache.org/jira/browse/SPARK-20964 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro >Priority: Minor > > The current Spark has many non-reserved words that are essentially reserved > in the ANSI/SQL standard > (http://developer.mimer.se/validator/sql-reserved-words.tml). > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4#L709 > This is because there are many datasources (for instance twitter4j) that > unfortunately use reserved keywords for column names (See [~hvanhovell]'s > comments: https://github.com/apache/spark/pull/18079#discussion_r118842186). > We might fix this issue in future major releases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
[ https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-21084: Description: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. Here's a high-level summary of the current planned work: * [SPARK-21097]: Preserve an executor's cached data when shutting down the executor. * (JIRA TBD): Make Spark give up executors in a controlled fashion when the RM indicates it is running low on capacity. * (JIRA TBD): Reduce the delay for dynamic allocation to spin up new executors. Note that this overall plan is subject to change, and other members of the community should feel free to suggest changes and to help out. was: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. Here's a high-level summary of the current set of planned enhancements: * [SPARK-21097]:Preserve an executor's cached data when shutting down the executor Note that this overall plan is subject to change, and other members of the community should feel free to suggest changes and to help out. > Improvements to dynamic allocation for notebook use cases > - > > Key: SPARK-21084 > URL: https://issues.apache.org/jira/browse/SPARK-21084 > Project: Spark > Issue Type: Umbrella > Components: Block Manager, Scheduler, Spark Core, YARN >Affects Versions: 2.2.0, 2.3.0 >Reporter: Frederick Reiss > > One important application of Spark is to support many notebook users with a > single YARN or Spark Standalone cluster. We at IBM have seen this > requirement across multiple deployments of Spark: on-premises and private > cloud deployments at our clients, as well as on the IBM cloud. The scenario
[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
[ https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-21084: Description: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. Here's a high-level summary of the current set of planned enhancements: * [SPARK-21097]:Preserve an executor's cached data when shutting down the executor Note that this overall plan is subject to change, and other members of the community should feel free to suggest changes and to help out. was: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. > Improvements to dynamic allocation for notebook use cases > - > > Key: SPARK-21084 > URL: https://issues.apache.org/jira/browse/SPARK-21084 > Project: Spark > Issue Type: Umbrella > Components: Block Manager, Scheduler, Spark Core, YARN >Affects Versions: 2.2.0, 2.3.0 >Reporter: Frederick Reiss > > One important application of Spark is to support many notebook users with a > single YARN or Spark Standalone cluster. We at IBM have seen this > requirement across multiple deployments of Spark: on-premises and private > cloud deployments at our clients, as well as on the IBM cloud. The scenario > goes something like this: "Every morning at 9am, 500 analysts log into their > computers and start running Spark notebooks intermittently for the next 8 > hours." I'm sure that many other members of the community are interested in > making similar scenarios work. > > Dynamic allocation is supposed to support these kinds of use cases by > shifting cluster resources towards users who are currently executing scalable > code. In our own testing, we have encountered a
[jira] [Commented] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
[ https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049669#comment-16049669 ] Frederick Reiss commented on SPARK-21084: - [~sowen] thanks for having a look at this JIRA and giving feedback! I must confess that, when our product groups first brought these issues to my attention, my initial response was similar to yours. Each of the issues described above can be fixed *in isolation* by reconfiguring Spark and/or the resource manager. The problem is that every such fix makes the other issues worse. We spent a number of weeks playing best practices whack-a-mole before resigning ourselves to making some targeted improvements to Spark itself. I'll update the description of this JIRA in a moment with a high-level description of the Spark changes we're currently looking into. In the meantime, here's a quick summary of what we ran into while attempting to devise a workable configuration of dynamic allocation for notebook users: Issue #1 (starvation): The obvious fix here is preemption. But there is currently no way to preempt an executor gently. The only option is to shut down the executor and drop its data, which leads to issues #2 and #4. Worse, Spark's scheduling and cache management are opaque to the resource manager, so the RM makes arbitrary choices of which executor to shoot. Another approach is to configure {{spark.dynamicAllocation.cachedExecutorIdleTimeout}} so that notebook sessions voluntarily give up executors, even when those executors have cached data. But this leads to issues #2 and #4. Issue #2 (request latency): This issue has two root causes: a) It takes a noticeable amount of time to start and ramp up new executors. b) Spark defers the cost of issue #4 (losing cached data) until a job attempts to consume the missing data. For root cause (a), the obvious solution is to reserve a permanent minimum pool of executors for each notebook user by setting the {{spark.dynamicAllocation.minExecutors}} parameter to a sufficiently high value. But tying down containers in this way leaves fewer resources for other users, exacerbating issues #1, #3, and #4. The reserved executors are likely to be idle most of the time, because notebook users alternate between running Spark jobs, running local computation in the notebook kernel, and looking at results in the web browser. See issue #4 below for what happens when you try to address root cause (b) with config changes. Issue #3 (unfair allocation of CPU): The obvious fix here is to set {{spark.dynamicAllocation.cachedExecutorIdleTimeout}} so that notebook sessions voluntarily give up executors, even when those executors have cached data. But this leads to issues #2 and #4. One can also reduce the value of {{spark.dynamicAllocation.maxExecutors}}, but that puts a cap on the degree of parallelism that a given user can access, leading to more of issue #2. Issue #4 (loss of cached data): The obvious fix here is to set {{spark.dynamicAllocation.cachedExecutorIdleTimeout}} to infinity. But then any notebook user who has called RDD.cache() at some point in the past will tie down a large pool of containers indefinitely, leading to issues #1, #2, and #3. If you attempt to limit the size of this large pool by reducing {{spark.dynamicAllocation.maxExecutors}}, you limit the peak performance that the notebook user can get out of Spark, leading to issue #2. > Improvements to dynamic allocation for notebook use cases > - > > Key: SPARK-21084 > URL: https://issues.apache.org/jira/browse/SPARK-21084 > Project: Spark > Issue Type: Umbrella > Components: Block Manager, Scheduler, Spark Core, YARN >Affects Versions: 2.2.0, 2.3.0 >Reporter: Frederick Reiss > > One important application of Spark is to support many notebook users with a > single YARN or Spark Standalone cluster. We at IBM have seen this > requirement across multiple deployments of Spark: on-premises and private > cloud deployments at our clients, as well as on the IBM cloud. The scenario > goes something like this: "Every morning at 9am, 500 analysts log into their > computers and start running Spark notebooks intermittently for the next 8 > hours." I'm sure that many other members of the community are interested in > making similar scenarios work. > > Dynamic allocation is supposed to support these kinds of use cases by > shifting cluster resources towards users who are currently executing scalable > code. In our own testing, we have encountered a number of issues with using > the current implementation of dynamic allocation for this purpose: > *Issue #1: Starvation.* A Spark job acquires all available containers, > preventing other jobs or applications from starting. > *Issue #2: Request latency.* Jobs
[jira] [Updated] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
[ https://issues.apache.org/jira/browse/SPARK-21084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-21084: Description: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. was: One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available YARN containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. > Improvements to dynamic allocation for notebook use cases > - > > Key: SPARK-21084 > URL: https://issues.apache.org/jira/browse/SPARK-21084 > Project: Spark > Issue Type: Umbrella > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Frederick Reiss > > One important application of Spark is to support many notebook users with a > single YARN or Spark Standalone cluster. We at IBM have seen this > requirement across multiple deployments of Spark: on-premises and private > cloud deployments at our clients, as well as on the IBM cloud. The scenario > goes something like this: "Every morning at 9am, 500 analysts log into their > computers and start running Spark notebooks intermittently for the next 8 > hours." I'm sure that many other members of the community are interested in > making similar scenarios work. > > Dynamic allocation is supposed to support these kinds of use cases by > shifting cluster resources towards users who are currently executing scalable > code. In our own testing, we have encountered a number of issues with using > the current implementation of dynamic allocation for this purpose: > *Issue #1: Starvation.* A Spark job acquires all available containers, > preventing other jobs or applications from starting. > *Issue #2: Request latency.* Jobs that would normally finish in less than 30 > seconds take 2-4x
[jira] [Created] (SPARK-21084) Improvements to dynamic allocation for notebook use cases
Frederick Reiss created SPARK-21084: --- Summary: Improvements to dynamic allocation for notebook use cases Key: SPARK-21084 URL: https://issues.apache.org/jira/browse/SPARK-21084 Project: Spark Issue Type: Umbrella Components: Spark Core Affects Versions: 2.2.0 Reporter: Frederick Reiss One important application of Spark is to support many notebook users with a single YARN or Spark Standalone cluster. We at IBM have seen this requirement across multiple deployments of Spark: on-premises and private cloud deployments at our clients, as well as on the IBM cloud. The scenario goes something like this: "Every morning at 9am, 500 analysts log into their computers and start running Spark notebooks intermittently for the next 8 hours." I'm sure that many other members of the community are interested in making similar scenarios work. Dynamic allocation is supposed to support these kinds of use cases by shifting cluster resources towards users who are currently executing scalable code. In our own testing, we have encountered a number of issues with using the current implementation of dynamic allocation for this purpose: *Issue #1: Starvation.* A Spark job acquires all available YARN containers, preventing other jobs or applications from starting. *Issue #2: Request latency.* Jobs that would normally finish in less than 30 seconds take 2-4x longer than normal with dynamic allocation. *Issue #3: Unfair resource allocation due to cached data.* Applications that have cached RDD partitions hold onto executors indefinitely, denying those resources to other applications. *Issue #4: Loss of cached data leads to thrashing.* Applications repeatedly lose partitions of cached RDDs because the underlying executors are removed; the applications then need to rerun expensive computations. This umbrella JIRA covers efforts to address these issues by making enhancements to Spark. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18127) Add hooks and extension points to Spark
[ https://issues.apache.org/jira/browse/SPARK-18127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983872#comment-15983872 ] Frederick Reiss commented on SPARK-18127: - Is there a design document or a public design and requirements discussion associated with this JIRA? > Add hooks and extension points to Spark > --- > > Key: SPARK-18127 > URL: https://issues.apache.org/jira/browse/SPARK-18127 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Srinath >Assignee: Sameer Agarwal > Fix For: 2.2.0 > > > As a Spark user I want to be able to customize my spark session. I currently > want to be able to do the following things: > # I want to be able to add custom analyzer rules. This allows me to implement > my own logical constructs; an example of this could be a recursive operator. > # I want to be able to add my own analysis checks. This allows me to catch > problems with spark plans early on. An example of this can be some datasource > specific checks. > # I want to be able to add my own optimizations. This allows me to optimize > plans in different ways, for instance when you use a very different cluster > (for example a one-node X1 instance). This supersedes the current > {{spark.experimental}} methods > # I want to be able to add my own planning strategies. This supersedes the > current {{spark.experimental}} methods. This allows me to plan my own > physical plan, an example of this would to plan my own heavily integrated > data source (CarbonData for example). > # I want to be able to use my own customized SQL constructs. An example of > this would supporting my own dialect, or be able to add constructs to the > current SQL language. I should not have to implement a complete parse, and > should be able to delegate to an underlying parser. > # I want to be able to track modifications and calls to the external catalog. > I want this API to be stable. This allows me to do synchronize with other > systems. > This API should modify the SparkSession when the session gets started, and it > should NOT change the session in flight. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13534) Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas
[ https://issues.apache.org/jira/browse/SPARK-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15596756#comment-15596756 ] Frederick Reiss commented on SPARK-13534: - We ([~bryanc], [~holdenk], [~yinxusen], and myself) are looking into this. Here's a rough outline of the current planned approach: - Add a dependency on Arrow 0.1's Java and Scala APIs to Spark. - Add a new developer API method to Dataset, {{collectAsArrow()}}, that returns an array of byte arrays, where each byte array contains a block of records in Arrow format. The conversion to Arrow will be a streamlined version of the Parquet conversion in {{ParquetWriteSupport}} (minus all the callbacks and levels of indirection). Conversion of complex types (Struct, Array, Map) to Arrow will not be supported in this version. - modify Pyspark's {{DataFrame.toPandas}} method to use the following logic: {noformat} if (the schema of the DataFrame does not contain complex types) Call collectAsArrow() on the underlying Scala Dataset. Pull the resulting buffers of Arrow data over to the Python process. Use Arrow's Python APIs to convert the buffers into a single Pandas dataframe. else Use the existing code as a slow-path conversion. {noformat} > Implement Apache Arrow serializer for Spark DataFrame for use in > DataFrame.toPandas > --- > > Key: SPARK-13534 > URL: https://issues.apache.org/jira/browse/SPARK-13534 > Project: Spark > Issue Type: New Feature > Components: PySpark >Reporter: Wes McKinney > > The current code path for accessing Spark DataFrame data in Python using > PySpark passes through an inefficient serialization-deserialiation process > that I've examined at a high level here: > https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] > objects are being deserialized in pure Python as a list of tuples, which are > then converted to pandas.DataFrame using its {{from_records}} alternate > constructor. This also uses a large amount of memory. > For flat (no nested types) schemas, the Apache Arrow memory layout > (https://github.com/apache/arrow/tree/master/format) can be deserialized to > {{pandas.DataFrame}} objects with comparatively small overhead compared with > memcpy / system memory bandwidth -- Arrow's bitmasks must be examined, > replacing the corresponding null values with pandas's sentinel values (None > or NaN as appropriate). > I will be contributing patches to Arrow in the coming weeks for converting > between Arrow and pandas in the general case, so if Spark can send Arrow > memory to PySpark, we will hopefully be able to increase the Python data > access throughput by an order of magnitude or more. I propose to add an new > serializer for Spark DataFrame and a new method that can be invoked from > PySpark to request a Arrow memory-layout byte stream, prefixed by a data > header indicating array buffer offsets and sizes. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15777) Catalog federation
[ https://issues.apache.org/jira/browse/SPARK-15777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15542995#comment-15542995 ] Frederick Reiss commented on SPARK-15777: - Thanks for posting this design doc. Some comments from a quick read-through: * I'm having some trouble following the description of optimizer rule pluggability in the current design doc. If the user is connected to multiple external data sources, will the optimizer rules for all those data sources be active all the time, even when running queries that only touch a single data source? If there are rules active from more than one external data source, in what order will those rules be evaluated? Will the order of evaluation (and hence the query plan) change if the user connects to the data sources in a different order? Note that if you could post a link to the WIP implementation it would probably clear up my questions here quickly. * Limitation number (2) seems pretty significant to me. In a site uses multiple external data sources, users will need to include boilerplate at the beginning of every application to connect to all of the data sources' catalogs explicitly. That situation would be similar to the problem that motivated this JIRA in the first place. I think it would be good to include a mechanism for storing a persistent "meta-catalog" of external catalogs. * I don't see a mention of the namespace handling issues that the description for this JIRA mentions. With the current design, it looks like two external catalogs could define the same table name. Did I miss something, or is the plan to push out resolving namespace conflicts to a follow-on JIRA? > Catalog federation > -- > > Key: SPARK-15777 > URL: https://issues.apache.org/jira/browse/SPARK-15777 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin > Attachments: SparkFederationDesign.pdf > > > This is a ticket to track progress to support federating multiple external > catalogs. This would require establishing an API (similar to the current > ExternalCatalog API) for getting information about external catalogs, and > ability to convert a table into a data source table. > As part of this, we would also need to be able to support more than a > two-level table identifier (database.table). At the very least we would need > a three level identifier for tables (catalog.database.table). A possibly > direction is to support arbitrary level hierarchical namespaces similar to > file systems. > Once we have this implemented, we can convert the current Hive catalog > implementation into an external catalog that is "mounted" into an internal > catalog. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10815) API design: data sources and sinks
[ https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530313#comment-15530313 ] Frederick Reiss commented on SPARK-10815: - Thanks for the clarification. Do you have a design to share at this point, or were you intending to solicit a design from members of the community? > API design: data sources and sinks > -- > > Key: SPARK-10815 > URL: https://issues.apache.org/jira/browse/SPARK-10815 > Project: Spark > Issue Type: Sub-task > Components: SQL, Streaming >Reporter: Reynold Xin > > The existing (in 2.0) source/sink interface for structured streaming depends > on RDDs. This dependency has two issues: > 1. The RDD interface is wide and difficult to stabilize across versions. This > is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. > Ideally, a source/sink implementation created for Spark 2.x should work in > Spark 10.x, assuming the JVM is still around. > 2. It is difficult to swap in/out a different execution engine. > The purpose of this ticket is to create a stable interface that addresses the > above two. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17631) Structured Streaming - Do we need to output results through http API?
[ https://issues.apache.org/jira/browse/SPARK-17631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15528403#comment-15528403 ] Frederick Reiss commented on SPARK-17631: - I can think of two use cases where one might want to have the option to send the output of a Structured Streaming application out over HTTP. The first use case involves using HTTP as a lightweight protocol for transferring bulk data out of a Structured Streaming query to a remote system. In this case, I would think the best approach would be to use a {{ForeachSink}}. That way, the HTTP connections would originate from the executors instead of having all the data pulled into the driver. I'm not sure that there's really a need to add a built-in sink type for this use case, as the existing {{ForeachSink}} provides most of what one would need. The second use case involves tunneling data out of the Spark cluster to an existing legacy system that speaks HTTP or HTTPS. For example, one might want a Structured Streaming application to send ML model updates to an application server via a secured connection. In this use case, I would think that it would be appropriate for the data to be pulled to the driver as in the PR. However, you would want a lot more configurability in any facility built into Spark; otherwise, there isn't much benefit over what a user could throw together in a few minutes over the (as yet unopened, but hopefully soon available) Sink API. > Structured Streaming - Do we need to output results through http API? > - > > Key: SPARK-17631 > URL: https://issues.apache.org/jira/browse/SPARK-17631 > Project: Spark > Issue Type: New Feature > Components: SQL, Streaming >Affects Versions: 2.0.0 >Reporter: zhangxinyu >Priority: Minor > > Streaming query results can be sinked to http server through http post request -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16407) Allow users to supply custom StreamSinkProviders
[ https://issues.apache.org/jira/browse/SPARK-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15508260#comment-15508260 ] Frederick Reiss commented on SPARK-16407: - It looks like there is a general agreement here that Structured Streaming ought to have some way to direct the output of a streaming query to the Spark driver. But there is a disagreement about what shape that API should take. Can we have a discussion on this thread about what would be an acceptable design to meet this requirement, or would it be better to move over to the dev list? > Allow users to supply custom StreamSinkProviders > > > Key: SPARK-16407 > URL: https://issues.apache.org/jira/browse/SPARK-16407 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: holdenk > > The current DataStreamWriter allows users to specify a class name as format, > however it could be easier for people to directly pass in a specific provider > instance - e.g. for user equivalent of ForeachSink or other sink with > non-string parameters. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10815) API design: data sources and sinks
[ https://issues.apache.org/jira/browse/SPARK-10815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15505102#comment-15505102 ] Frederick Reiss commented on SPARK-10815: - I'm confused by the current description of this task. As far as I can see, the interface for sources and sinks in Structured Streaming has no direct dependencies on RDDs. More precisely, the four traits (Source, Sink, StreamSourceProvider, and StreamSinkProvider) that directly comprise the interface do not depend on the {{RDD}} class. The DataStreamWriter and DataFrameWriter class that currently insulate users from Source, Sink, etc. also do not have any dependencies on {{RDD}}. Is this issue intended perhaps to reference limitations of the implementations of Datasets that require unnecessary direct access to the Dataset's internal RDD? > API design: data sources and sinks > -- > > Key: SPARK-10815 > URL: https://issues.apache.org/jira/browse/SPARK-10815 > Project: Spark > Issue Type: Sub-task > Components: SQL, Streaming >Reporter: Reynold Xin > > The existing (in 2.0) source/sink interface for structured streaming depends > on RDDs. This dependency has two issues: > 1. The RDD interface is wide and difficult to stabilize across versions. This > is similar to point 1 in https://issues.apache.org/jira/browse/SPARK-15689. > Ideally, a source/sink implementation created for Spark 2.x should work in > Spark 10.x, assuming the JVM is still around. > 2. It is difficult to swap in/out a different execution engine. > The purpose of this ticket is to create a stable interface that addresses the > above two. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16407) Allow users to supply custom StreamSinkProviders
[ https://issues.apache.org/jira/browse/SPARK-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15494795#comment-15494795 ] Frederick Reiss commented on SPARK-16407: - With respect, I'm not seeing a whole lot of flux in those APIs, or in Structured Streaming in general. SPARK-8360, the JIRA for source and sink API design, has no description, no comments, and no changes since November of 2015. The Structured Streaming design doc hasn't changed appreciably since May. Even short-term tasks in Structured Streaming are very thin on the ground, and PR traffic is minimal. Is something going on behind closed doors that other members of the community are not aware of? > Allow users to supply custom StreamSinkProviders > > > Key: SPARK-16407 > URL: https://issues.apache.org/jira/browse/SPARK-16407 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: holdenk > > The current DataStreamWriter allows users to specify a class name as format, > however it could be easier for people to directly pass in a specific provider > instance - e.g. for user equivalent of ForeachSink or other sink with > non-string parameters. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17543) Missing log4j config file for tests in common/network-shuffle
Frederick Reiss created SPARK-17543: --- Summary: Missing log4j config file for tests in common/network-shuffle Key: SPARK-17543 URL: https://issues.apache.org/jira/browse/SPARK-17543 Project: Spark Issue Type: Bug Reporter: Frederick Reiss Priority: Trivial *This is a small starter task to help new contributors practice the pull request and code review process.* The Maven module {{common/network-shuffle}} does not have a log4j configuration file for its test cases. Usually these configuration files are located inside each module, in the directory {{src/test/resources}}. The missing configuration file leads to a scary-looking but harmless series of errors and stack traces in Spark build logs: {noformat} SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. log4j:ERROR Could not read configuration file from URL [file:src/test/resources/log4j.properties]. java.io.FileNotFoundException: src/test/resources/log4j.properties (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.(FileInputStream.java:146) at java.io.FileInputStream.(FileInputStream.java:101) at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90) at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526) at org.apache.log4j.LogManager.(LogManager.java:127) at org.apache.log4j.Logger.getLogger(Logger.java:104) at io.netty.util.internal.logging.Log4JLoggerFactory.newInstance(Log4JLoggerFactory.java:29) at io.netty.util.internal.logging.InternalLoggerFactory.newDefaultFactory(InternalLoggerFactory.java:46) at io.netty.util.internal.logging.InternalLoggerFactory.(InternalLoggerFactory.java:34) ... {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17542) Compiler warning in UnsafeInMemorySorter class
Frederick Reiss created SPARK-17542: --- Summary: Compiler warning in UnsafeInMemorySorter class Key: SPARK-17542 URL: https://issues.apache.org/jira/browse/SPARK-17542 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Frederick Reiss Priority: Trivial *This is a small starter task to help new contributors practice the pull request and code review process.* When building Spark with Java 8, there is a compiler warning in the class UnsafeInMemorySorter: {noformat} [WARNING] .../core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java:284: warning: [static] static method should be qualified by type name, TaskMemoryManager, instead of by an expression {noformat} Spark should compile without these kinds of warnings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15487780#comment-15487780 ] Frederick Reiss commented on SPARK-15406: - +1 for taking the simple route in the short term. I'm seeing a lot of people at my work who are just kicking the tires on Structured Streaming at this point. Right now, we just need a Kafka connector that is correct and easy to use; performance and advanced functionality are of secondary importance. > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17513) StreamExecution should discard unneeded metadata
Frederick Reiss created SPARK-17513: --- Summary: StreamExecution should discard unneeded metadata Key: SPARK-17513 URL: https://issues.apache.org/jira/browse/SPARK-17513 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss The StreamExecution maintains a write-ahead log of batch metadata in order to allow repeating previously in-flight batches if the driver is restarted. StreamExecution does not garbage-collect or compact this log in any way. Since the log is implemented with HDFSMetadataLog, these files will consume memory on the HDFS NameNode. Specifically, each log file will consume about 300 bytes of NameNode memory (150 bytes for the inode and 150 bytes for the block of file contents; see [https://www.cloudera.com/documentation/enterprise/latest/topics/admin_nn_memory_config.html]. An application with a 100 msec batch interval will increase the NameNode's heap usage by about 250MB per day. There is also the matter of recovery. StreamExecution reads its entire log when restarting. This read operation will be very expensive if the log contains millions of entries spread over millions of files. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17475) HDFSMetadataLog should not leak CRC files
[ https://issues.apache.org/jira/browse/SPARK-17475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17475: Description: When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. NFS or the driver node's local filesystem), the class leaves orphan checksum (CRC) files in the log directory. The files have names that follow the pattern "..[long UUID hex string].tmp.crc". These files exist because HDFSMetaDataLog renames other temporary files without renaming the corresponding checksum files. There is one CRC file per batch, so the directory fills up quite quickly. I'm not certain, but this problem might also occur on certain versions of the HDFS APIs. was: When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. NFS or the driver node's local filesystem), the class leaves orphan checksum (CRC) files in the log directory. The files have names that follow the pattern "..[long UUID hex string].tmp.crc". These files exist HDFSMetaDataLog renames other temporary files without renaming the corresponding checksum files. There is one CRC file per batch, so the directory fills up quite quickly. I'm not certain, but this problem might also occur on certain versions of the HDFS APIs. > HDFSMetadataLog should not leak CRC files > - > > Key: SPARK-17475 > URL: https://issues.apache.org/jira/browse/SPARK-17475 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > When HDFSMetadataLog uses a log directory on a filesystem other than HDFS > (i.e. NFS or the driver node's local filesystem), the class leaves orphan > checksum (CRC) files in the log directory. The files have names that follow > the pattern "..[long UUID hex string].tmp.crc". These files exist because > HDFSMetaDataLog renames other temporary files without renaming the > corresponding checksum files. There is one CRC file per batch, so the > directory fills up quite quickly. > I'm not certain, but this problem might also occur on certain versions of the > HDFS APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17421) Warnings about "MaxPermSize" parameter when building with Maven and Java 8
[ https://issues.apache.org/jira/browse/SPARK-17421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477864#comment-15477864 ] Frederick Reiss commented on SPARK-17421: - Committer feedback on first PR was that the necessary build changes to fix this issue are too substantial to be appropriate. Created a second PR that only documents the compile warnings as a known issue and explains why these warnings occur even when the user sets MAVEN_OPTS. > Warnings about "MaxPermSize" parameter when building with Maven and Java 8 > -- > > Key: SPARK-17421 > URL: https://issues.apache.org/jira/browse/SPARK-17421 > Project: Spark > Issue Type: Bug > Components: Build >Reporter: Frederick Reiss >Priority: Minor > > When building Spark with {{build/mvn}} or {{dev/run-tests}}, a Java warning > appears repeatedly on STDERR: > {{OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support > was removed in 8.0}} > This warning is due to {{build/mvn}} adding the {{-XX:MaxPermSize=512M}} > option to {{MAVEN_OPTS}}. When compiling with Java 7, this parameter is > essential. With Java 8, the parameter leads to the warning above. > Because {{build/mvn}} adds {{MaxPermSize}} to {{MAVEN_OPTS}}, even if that > environment variable doesn't contain the option, setting {{MAVEN_OPTS}} to a > string that does not contain {{MaxPermSize}} has no effect. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17475) HDFSMetadataLog should not leak CRC files
Frederick Reiss created SPARK-17475: --- Summary: HDFSMetadataLog should not leak CRC files Key: SPARK-17475 URL: https://issues.apache.org/jira/browse/SPARK-17475 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss When HDFSMetadataLog uses a log directory on a filesystem other than HDFS (i.e. NFS or the driver node's local filesystem), the class leaves orphan checksum (CRC) files in the log directory. The files have names that follow the pattern "..[long UUID hex string].tmp.crc". These files exist HDFSMetaDataLog renames other temporary files without renaming the corresponding checksum files. There is one CRC file per batch, so the directory fills up quite quickly. I'm not certain, but this problem might also occur on certain versions of the HDFS APIs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will sit in a loop calling {{getOffset()}} every 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this rapid polling leads to excessive CPU usage. In a production environment, this overhead could disrupt critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If no data has arrived, Spark will list the directory's contents up to 100 times per second. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If no data has arrived, Spark will list the directory's contents up to 100 times per second. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss >Priority: Minor > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will sit in a loop calling {{getOffset()}} every 10 msec > (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new > data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this rapid polling leads to excessive CPU usage. > In a production environment, this overhead could disrupt critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If no > data has arrived, Spark will list the directory's contents up to 100 times > per second. This overhead could disrupt service to other systems using HDFS, > including Spark itself. A similar situation will exist with the Kafka source, > the {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If no data has arrived, Spark will list the directory's contents up to 100 times per second. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will list an HDFS directory's contents up to 100 times per second. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss >Priority: Minor > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} > until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If no > data has arrived, Spark will list the directory's contents up to 100 times > per second. This overhead could disrupt service to other systems using HDFS, > including Spark itself. A similar situation will exist with the Kafka source, > the {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will list an HDFS directory's contents up to 100 times per second. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss >Priority: Minor > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec (the default value of STREAMING_POLLING_DELAY) on every {{Source}} > until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will list an HDFS directory's contents up to 100 times per second. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec (the default value of on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss >Priority: Minor > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec (the default value of on every {{Source}} until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will make hundreds of RPC calls per second to the HDFS NameNode. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Priority: Minor (was: Major) > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss >Priority: Minor > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec on every {{Source}} until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will make hundreds of RPC calls per second to the HDFS NameNode. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default polling and trigger intervals cause excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Summary: Default polling and trigger intervals cause excessive RPC calls (was: Default trigger interval causes excessive RPC calls) > Default polling and trigger intervals cause excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec on every {{Source}} until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will make hundreds of RPC calls per second to the HDFS NameNode. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default trigger interval causes excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every 10 msec on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. > Default trigger interval causes excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} every > 10 msec on every {{Source}} until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will make hundreds of RPC calls per second to the HDFS NameNode. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17421) Warnings about "MaxPermSize" parameter when building with Maven and Java 8
Frederick Reiss created SPARK-17421: --- Summary: Warnings about "MaxPermSize" parameter when building with Maven and Java 8 Key: SPARK-17421 URL: https://issues.apache.org/jira/browse/SPARK-17421 Project: Spark Issue Type: Bug Components: Build Reporter: Frederick Reiss Priority: Minor When building Spark with {{build/mvn}} or {{dev/run-tests}}, a Java warning appears repeatedly on STDERR: {{OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0}} This warning is due to {{build/mvn}} adding the {{-XX:MaxPermSize=512M}} option to {{MAVEN_OPTS}}. When compiling with Java 7, this parameter is essential. With Java 8, the parameter leads to the warning above. Because {{build/mvn}} adds {{MaxPermSize}} to {{MAVEN_OPTS}}, even if that environment variable doesn't contain the option, setting {{MAVEN_OPTS}} to a string that does not contain {{MaxPermSize}} has no effect. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17386) Default trigger interval causes excessive RPC calls
[ https://issues.apache.org/jira/browse/SPARK-17386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-17386: Description: The default trigger interval for a Structured Streaming query is {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on every {{Source}} until new data arrives. In test cases, where most of the sources are {{MemoryStream}} or {{TextSocketSource}}, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of {{FileStreamSource}} performs a directory listing of an HDFS directory. If the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark will make hundreds of RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the {{getOffset()}} method of which will presumably call Kafka's {{Consumer.poll()}} method. was: The default trigger interval for a Structured Streaming query is `ProcessingTime(0)`, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in `StreamExecution` will spin in a tight loop calling `getOffset()` on every `Source` until new data arrives. In test cases, where most of the sources are `MemoryStream` or `TextSocketSource`, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be `FileStreamSource` or the not-yet-written Kafka 0.10 Source. The `getOffset()` method of `FileStreamSource` performs a directory listing of an HDFS directory. If the scheduler calls `FileStreamSource.getOffset()` in a tight loop, Spark will make several hundred RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the `getOffset()` method of which will presumably call Kafka's `Consumer.poll()` method. > Default trigger interval causes excessive RPC calls > --- > > Key: SPARK-17386 > URL: https://issues.apache.org/jira/browse/SPARK-17386 > Project: Spark > Issue Type: Bug > Components: Streaming >Reporter: Frederick Reiss > > The default trigger interval for a Structured Streaming query is > {{ProcessingTime(0)}}, i.e. "trigger new microbatches as fast as possible". > When the trigger is set to this default value, the scheduler in > {{StreamExecution}} will spin in a tight loop calling {{getOffset()}} on > every {{Source}} until new data arrives. > In test cases, where most of the sources are {{MemoryStream}} or > {{TextSocketSource}}, this spinning leads to excessive CPU usage. > In a production environment, this spinning could take down critical > infrastructure. Most sources in Spark clusters will be {{FileStreamSource}} > or the not-yet-written Kafka 0.10 Source. The {{getOffset()}} method of > {{FileStreamSource}} performs a directory listing of an HDFS directory. If > the scheduler calls {{FileStreamSource.getOffset()}} in a tight loop, Spark > will make hundreds of RPC calls per second to the HDFS NameNode. This > overhead could disrupt service to other systems using HDFS, including Spark > itself. A similar situation will exist with the Kafka source, the > {{getOffset()}} method of which will presumably call Kafka's > {{Consumer.poll()}} method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17386) Default trigger interval causes excessive RPC calls
Frederick Reiss created SPARK-17386: --- Summary: Default trigger interval causes excessive RPC calls Key: SPARK-17386 URL: https://issues.apache.org/jira/browse/SPARK-17386 Project: Spark Issue Type: Bug Components: Streaming Reporter: Frederick Reiss The default trigger interval for a Structured Streaming query is `ProcessingTime(0)`, i.e. "trigger new microbatches as fast as possible". When the trigger is set to this default value, the scheduler in `StreamExecution` will spin in a tight loop calling `getOffset()` on every `Source` until new data arrives. In test cases, where most of the sources are `MemoryStream` or `TextSocketSource`, this spinning leads to excessive CPU usage. In a production environment, this spinning could take down critical infrastructure. Most sources in Spark clusters will be `FileStreamSource` or the not-yet-written Kafka 0.10 Source. The `getOffset()` method of `FileStreamSource` performs a directory listing of an HDFS directory. If the scheduler calls `FileStreamSource.getOffset()` in a tight loop, Spark will make several hundred RPC calls per second to the HDFS NameNode. This overhead could disrupt service to other systems using HDFS, including Spark itself. A similar situation will exist with the Kafka source, the `getOffset()` method of which will presumably call Kafka's `Consumer.poll()` method. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13534) Implement Apache Arrow serializer for Spark DataFrame for use in DataFrame.toPandas
[ https://issues.apache.org/jira/browse/SPARK-13534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456990#comment-15456990 ] Frederick Reiss commented on SPARK-13534: - [~wesmckinn], are you planning to work on this issue soon? > Implement Apache Arrow serializer for Spark DataFrame for use in > DataFrame.toPandas > --- > > Key: SPARK-13534 > URL: https://issues.apache.org/jira/browse/SPARK-13534 > Project: Spark > Issue Type: New Feature > Components: PySpark >Reporter: Wes McKinney > > The current code path for accessing Spark DataFrame data in Python using > PySpark passes through an inefficient serialization-deserialiation process > that I've examined at a high level here: > https://gist.github.com/wesm/0cb5531b1c2e346a0007. Currently, RDD[Row] > objects are being deserialized in pure Python as a list of tuples, which are > then converted to pandas.DataFrame using its {{from_records}} alternate > constructor. This also uses a large amount of memory. > For flat (no nested types) schemas, the Apache Arrow memory layout > (https://github.com/apache/arrow/tree/master/format) can be deserialized to > {{pandas.DataFrame}} objects with comparatively small overhead compared with > memcpy / system memory bandwidth -- Arrow's bitmasks must be examined, > replacing the corresponding null values with pandas's sentinel values (None > or NaN as appropriate). > I will be contributing patches to Arrow in the coming weeks for converting > between Arrow and pandas in the general case, so if Spark can send Arrow > memory to PySpark, we will hopefully be able to increase the Python data > access throughput by an order of magnitude or more. I propose to add an new > serializer for Spark DataFrame and a new method that can be invoked from > PySpark to request a Arrow memory-layout byte stream, prefixed by a data > header indicating array buffer offsets and sizes. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-17343) Prerequisites for Kafka 0.8 support in Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss closed SPARK-17343. --- Resolution: Won't Fix There will be no Kafka 0.8 connectors for Structured Streaming. See comments on SPARK-15406. > Prerequisites for Kafka 0.8 support in Structured Streaming > --- > > Key: SPARK-17343 > URL: https://issues.apache.org/jira/browse/SPARK-17343 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > This issue covers any API changes, refactoring, and utility classes/methods > that are necessary to make it possible to implement support for Kafka 0.8 > sources and sinks in Structured Streaming. > From a quick glance, it looks like some refactoring of the existing state > storage mechanism in the Kafka 0.8 DStream may suffice. But there might be > some additional groundwork in other areas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-17344) Kafka 0.8 support for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss closed SPARK-17344. --- Resolution: Won't Fix There will be no Kafka 0.8 connectors for Structured Streaming. See comments on SPARK-15406. > Kafka 0.8 support for Structured Streaming > -- > > Key: SPARK-17344 > URL: https://issues.apache.org/jira/browse/SPARK-17344 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Reporter: Frederick Reiss > > Design and implement Kafka 0.8-based sources and sinks for Structured > Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15454320#comment-15454320 ] Frederick Reiss commented on SPARK-15406: - Thanks for clearing that up. Marking SPARK-17343 and SPARK-17344 as "won't fix". > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453749#comment-15453749 ] Frederick Reiss commented on SPARK-15406: - WRT Kafka 0.8: I'm under the impression that there is a significant number of Spark users who are still stuck with Kafka 0.8 or 0.9. Kafka sits between multiple systems, so upgrades to production Kafka installations can be challenging. Are other people monitoring this JIRA seeing a similar situation, or are versions before 0.10 not in widespread use any more? If older Kafka releases aren't relevant, then we should probably deprecate the entire spark-streaming-kafka-0-8 component. > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453059#comment-15453059 ] Frederick Reiss commented on SPARK-15406: - I've taken the liberty of splitting this issue into several steps to facilitate collaboration. [~c...@koeninger.org], would you mind typing up any thoughts you have about how these steps should move forward, as well as if there are smaller subtasks that others can help out with? > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15406) Structured streaming support for consuming from Kafka
[ https://issues.apache.org/jira/browse/SPARK-15406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453047#comment-15453047 ] Frederick Reiss commented on SPARK-15406: - There has been some discussion of this JIRA on the mailing list; see [http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-with-Kafka-sources-sinks-tt18645.html] > Structured streaming support for consuming from Kafka > - > > Key: SPARK-15406 > URL: https://issues.apache.org/jira/browse/SPARK-15406 > Project: Spark > Issue Type: New Feature >Reporter: Cody Koeninger > > Structured streaming doesn't have support for kafka yet. I personally feel > like time based indexing would make for a much better interface, but it's > been pushed back to kafka 0.10.1 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17346) Kafka 0.10 support in Structured Streaming
Frederick Reiss created SPARK-17346: --- Summary: Kafka 0.10 support in Structured Streaming Key: SPARK-17346 URL: https://issues.apache.org/jira/browse/SPARK-17346 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss Implement Kafka 0.10-based sources and sinks for Structured Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17345) Prerequisites for Kafka 0.10 support in Structured Streaming
Frederick Reiss created SPARK-17345: --- Summary: Prerequisites for Kafka 0.10 support in Structured Streaming Key: SPARK-17345 URL: https://issues.apache.org/jira/browse/SPARK-17345 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss This issue covers any API changes, refactoring, and utility classes/methods that are necessary to make it possible to implement support for Kafka 0.10 sources and sinks in Structured Streaming. At a minimum, the changes in SPARK-16963 will be needed in order for the Kafka commit protocol to work. Given that KIP-33 ("Add a time based log index") is not yet in place, it may be necessary to make additional API changes in Spark for commit to work efficiently. Some refactoring of the existing KafkaRDD class is probably in order also. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17344) Kafka 0.8 support for Structured Streaming
Frederick Reiss created SPARK-17344: --- Summary: Kafka 0.8 support for Structured Streaming Key: SPARK-17344 URL: https://issues.apache.org/jira/browse/SPARK-17344 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss Design and implement Kafka 0.8-based sources and sinks for Structured Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17343) Prerequisites for Kafka 0.8 support in Structured Streaming
Frederick Reiss created SPARK-17343: --- Summary: Prerequisites for Kafka 0.8 support in Structured Streaming Key: SPARK-17343 URL: https://issues.apache.org/jira/browse/SPARK-17343 Project: Spark Issue Type: Sub-task Components: Streaming Reporter: Frederick Reiss This issue covers any API changes, refactoring, and utility classes/methods that are necessary to make it possible to implement support for Kafka 0.8 sources and sinks in Structured Streaming. >From a quick glance, it looks like some refactoring of the existing state >storage mechanism in the Kafka 0.8 DStream may suffice. But there might be >some additional groundwork in other areas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17267) Long running structured streaming requirements
[ https://issues.apache.org/jira/browse/SPARK-17267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15452993#comment-15452993 ] Frederick Reiss commented on SPARK-17267: - Converted SPARK-16963 to a subtask of this JIRA. > Long running structured streaming requirements > -- > > Key: SPARK-17267 > URL: https://issues.apache.org/jira/browse/SPARK-17267 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Reporter: Reynold Xin >Priority: Blocker > > This is an umbrella ticket to track various things that are required in order > to have the engine for structured streaming run non-stop in production. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state
[ https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frederick Reiss updated SPARK-16963: Issue Type: Sub-task (was: Improvement) Parent: SPARK-17267 > Change Source API so that sources do not need to keep unbounded state > - > > Key: SPARK-16963 > URL: https://issues.apache.org/jira/browse/SPARK-16963 > Project: Spark > Issue Type: Sub-task > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Frederick Reiss > > The version of the Source API in Spark 2.0.0 defines a single getBatch() > method for fetching records from the source, with the following Scaladoc > comments defining the semantics: > {noformat} > /** > * Returns the data that is between the offsets (`start`, `end`]. When > `start` is `None` then > * the batch should begin with the first available record. This method must > always return the > * same data for a particular `start` and `end` pair. > */ > def getBatch(start: Option[Offset], end: Offset): DataFrame > {noformat} > These semantics mean that a Source must retain all past history for the > stream that it backs. Further, a Source is also required to retain this data > across restarts of the process where the Source is instantiated, even when > the Source is restarted on a different machine. > These restrictions make it difficult to implement the Source API, as any > implementation requires potentially unbounded amounts of distributed storage. > See the mailing list thread at > [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html] > for more information. > This JIRA will cover augmenting the Source API with an additional callback > that will allow Structured Streaming scheduler to notify the source when it > is safe to discard buffered data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17303) dev/run-tests fails if spark-warehouse directory exists
Frederick Reiss created SPARK-17303: --- Summary: dev/run-tests fails if spark-warehouse directory exists Key: SPARK-17303 URL: https://issues.apache.org/jira/browse/SPARK-17303 Project: Spark Issue Type: Bug Components: Build Reporter: Frederick Reiss Priority: Minor The script dev/run-tests, which is intended for verifying the correctness of pull requests, runs Apache RAT to check for missing Apache license headers. Later, the script does a full compile/package/test sequence. The script as currently written works fine the first time. But the second time it runs, the Apache RAT checks fail due to the presence of the directory spark-warehouse, which the script indirectly creates during its regression test run. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state
[ https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440598#comment-15440598 ] Frederick Reiss commented on SPARK-16963: - Updated the pull request to address some conflicting changes in the main branch and to address some minor review comments. Changed the name of `getMinOffset` to `lastCommittedOffset` per Prashant's comments. Changes are still ready for review. > Change Source API so that sources do not need to keep unbounded state > - > > Key: SPARK-16963 > URL: https://issues.apache.org/jira/browse/SPARK-16963 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Frederick Reiss > > The version of the Source API in Spark 2.0.0 defines a single getBatch() > method for fetching records from the source, with the following Scaladoc > comments defining the semantics: > {noformat} > /** > * Returns the data that is between the offsets (`start`, `end`]. When > `start` is `None` then > * the batch should begin with the first available record. This method must > always return the > * same data for a particular `start` and `end` pair. > */ > def getBatch(start: Option[Offset], end: Offset): DataFrame > {noformat} > These semantics mean that a Source must retain all past history for the > stream that it backs. Further, a Source is also required to retain this data > across restarts of the process where the Source is instantiated, even when > the Source is restarted on a different machine. > These restrictions make it difficult to implement the Source API, as any > implementation requires potentially unbounded amounts of distributed storage. > See the mailing list thread at > [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html] > for more information. > This JIRA will cover augmenting the Source API with an additional callback > that will allow Structured Streaming scheduler to notify the source when it > is safe to discard buffered data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17165) FileStreamSource should not track the list of seen files indefinitely
[ https://issues.apache.org/jira/browse/SPARK-17165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15439391#comment-15439391 ] Frederick Reiss commented on SPARK-17165: - This problem is actually deeper than just FileStreamSource. With the current version of the Source trait, *every* source needs to keep infinite state. [~scrapco...@gmail.com] ran into that issue while writing a connector for MQTT. I opened SPARK-16963 a few weeks back to cover the core issue with the Stream trait. My open PR for that JIRA (https://github.com/apache/spark/pull/14553) has a fair amount of overlap with the PR here and with the one in SPARK-17235. Can we merge our efforts here to make a single sequence of small, easy-to-review change sets that will resolve these state management issues across all sources? I'm thinking that we can create a single JIRA (or reuse one of the existing ones) to cover "keep only bounded state for Structured Streaming data sources", then divide that JIRA into the following tasks: # Add a method to `Source` to trigger cleaning of processed data # Add a method to `HDFSMetadataLog` to clean out processed metadata # Implement garbage collection of old data (metadata and files) in `FileStreamSource` # Implement garbage collection of old data in `MemoryStream` and other stubs of Source # Modify the scheduler (`StreamExecution`) so that it triggers garbage collection of data and metadata Thoughts? > FileStreamSource should not track the list of seen files indefinitely > - > > Key: SPARK-17165 > URL: https://issues.apache.org/jira/browse/SPARK-17165 > Project: Spark > Issue Type: Bug > Components: SQL, Streaming >Reporter: Reynold Xin > > FileStreamSource currently tracks all the files seen indefinitely, which > means it can run out of memory or overflow. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state
[ https://issues.apache.org/jira/browse/SPARK-16963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15431779#comment-15431779 ] Frederick Reiss commented on SPARK-16963: - The proposed changes in the attached PR are now ready for review. [~marmbrus] can you please have a look at your convenience? [~prashant_] can you also please have a look with a particular focus on whether the changes fit with the MQTT connector? > Change Source API so that sources do not need to keep unbounded state > - > > Key: SPARK-16963 > URL: https://issues.apache.org/jira/browse/SPARK-16963 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 2.0.0 >Reporter: Frederick Reiss > > The version of the Source API in Spark 2.0.0 defines a single getBatch() > method for fetching records from the source, with the following Scaladoc > comments defining the semantics: > {noformat} > /** > * Returns the data that is between the offsets (`start`, `end`]. When > `start` is `None` then > * the batch should begin with the first available record. This method must > always return the > * same data for a particular `start` and `end` pair. > */ > def getBatch(start: Option[Offset], end: Offset): DataFrame > {noformat} > These semantics mean that a Source must retain all past history for the > stream that it backs. Further, a Source is also required to retain this data > across restarts of the process where the Source is instantiated, even when > the Source is restarted on a different machine. > These restrictions make it difficult to implement the Source API, as any > implementation requires potentially unbounded amounts of distributed storage. > See the mailing list thread at > [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html] > for more information. > This JIRA will cover augmenting the Source API with an additional callback > that will allow Structured Streaming scheduler to notify the source when it > is safe to discard buffered data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16963) Change Source API so that sources do not need to keep unbounded state
Frederick Reiss created SPARK-16963: --- Summary: Change Source API so that sources do not need to keep unbounded state Key: SPARK-16963 URL: https://issues.apache.org/jira/browse/SPARK-16963 Project: Spark Issue Type: Improvement Components: Streaming Affects Versions: 2.0.0 Reporter: Frederick Reiss The version of the Source API in Spark 2.0.0 defines a single getBatch() method for fetching records from the source, with the following Scaladoc comments defining the semantics: {noformat} /** * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then * the batch should begin with the first available record. This method must always return the * same data for a particular `start` and `end` pair. */ def getBatch(start: Option[Offset], end: Offset): DataFrame {noformat} These semantics mean that a Source must retain all past history for the stream that it backs. Further, a Source is also required to retain this data across restarts of the process where the Source is instantiated, even when the Source is restarted on a different machine. These restrictions make it difficult to implement the Source API, as any implementation requires potentially unbounded amounts of distributed storage. See the mailing list thread at [http://apache-spark-developers-list.1001551.n3.nabble.com/Source-API-requires-unbounded-distributed-storage-td18551.html] for more information. This JIRA will cover augmenting the Source API with an additional callback that will allow Structured Streaming scheduler to notify the source when it is safe to discard buffered data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15370) Some correlated subqueries return incorrect answers
[ https://issues.apache.org/jira/browse/SPARK-15370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15293496#comment-15293496 ] Frederick Reiss commented on SPARK-15370: - [~hvanhovell]'s comments on the first PR pointed out some additional cases that this fix needs to cover. Thanks for the help! I'm running regression tests on a second change set with additional code to cover these other cases. > Some correlated subqueries return incorrect answers > --- > > Key: SPARK-15370 > URL: https://issues.apache.org/jira/browse/SPARK-15370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Frederick Reiss > > The rewrite introduced in SPARK-14785 has the COUNT bug. The rewrite changes > the semantics of some correlated subqueries when there are tuples from the > outer query block that do not join with the subquery. For example: > {noformat} > spark-sql> create table R(a integer) as values (1); > spark-sql> create table S(b integer); > spark-sql> select R.a from R > > where (select count(*) from S where R.a = S.b) = 0; > Time taken: 2.139 seconds > > spark-sql> > (returns zero rows; the answer should be one row of '1') > {noformat} > This problem also affects the SELECT clause: > {noformat} > spark-sql> select R.a, > > (select count(*) from S where R.a = S.b) as cnt > > from R; > 1 NULL > (the answer should be "1 0") > {noformat} > Some subqueries with COUNT aggregates are *not* affected: > {noformat} > spark-sql> select R.a from R > > where (select count(*) from S where R.a = S.b) > 0; > Time taken: 0.609 seconds > spark-sql> > (Correct answer) > spark-sql> select R.a from R > > where (select count(*) + sum(S.b) from S where R.a = S.b) = 0; > Time taken: 0.553 seconds > spark-sql> > (Correct answer) > {noformat} > Other cases can trigger the variant of the COUNT bug for expressions > involving NULL checks: > {noformat} > spark-sql> select R.a from R > > where (select sum(S.b) is null from S where R.a = S.b); > (returns zero rows, should return one row) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15372) TPC-DS Qury 84 returns wrong results against TPC official
[ https://issues.apache.org/jira/browse/SPARK-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15288004#comment-15288004 ] Frederick Reiss commented on SPARK-15372: - There is no CONCAT function in the SQL standard. The concatenation operator in the standard is ||. DB2 treats "CONCAT" as a synonym for "||"; other vendors do different things. The standard states that if either input to the concatenation operator is NULL, then the output is NULL (Section 6.28 in the SQL/Foundations document). The TPC-DS benchmark spec doesn't say anything about the allowable semantics of CONCAT, only that "vendor specific syntax can be used". > TPC-DS Qury 84 returns wrong results against TPC official > - > > Key: SPARK-15372 > URL: https://issues.apache.org/jira/browse/SPARK-15372 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: JESSE CHEN >Assignee: Herman van Hovell >Priority: Critical > Labels: SPARK-15071 > Fix For: 2.0.0 > > > The official TPC-DS query 84 returns wrong results when compared to its > official answer set. > The query itself is: > {noformat} > select c_customer_id as customer_id >,concat(c_last_name , ', ' , c_first_name) as customername > from customer > ,customer_address > ,customer_demographics > ,household_demographics > ,income_band > ,store_returns > where ca_city = 'Edgewood' >and c_current_addr_sk = ca_address_sk >and ib_lower_bound >= 38128 >and ib_upper_bound <= 38128 + 5 >and ib_income_band_sk = hd_income_band_sk >and cd_demo_sk = c_current_cdemo_sk >and hd_demo_sk = c_current_hdemo_sk >and sr_cdemo_sk = cd_demo_sk > order by c_customer_id > limit 100; > {noformat} > Spark 2.0 build 0517 returned the following result: > {noformat} > AIPG Carter, Rodney > AKMBBAAA Mcarthur, Emma > CBNHBAAA Wells, Ron > DBME Vera, Tina > DBME Vera, Tina > DHKGBAAA Scott, Pamela > EIIBBAAA Atkins, Susan > FKAH Batiste, Ernest > GHMA Mitchell, Gregory > IAODBAAA Murray, Karen > IEOK Solomon, Clyde > IIBO Owens, David > IPDC Wallace, Eric > IPIM Hayward, Benjamin > JCIK Ramos, Donald > KFJE Roberts, Yvonne > KPGBBAAA NULL < ??? questionable row > LCLABAAA Whitaker, Lettie > MGME Sharp, Michael > MIGBBAAA Montgomery, Jesenia > MPDK Lopez, Isabel > NEOM Powell, Linda > NKPC Shaffer, Sergio > NOCK Vargas, James > OGJEBAAA Owens, Denice > {noformat} > Official answer set (which is correct!) > {noformat} > AIPG Carter , Rodney > AKMBBAAA Mcarthur, Emma > CBNHBAAA Wells , Ron > DBME Vera, Tina > DBME Vera, Tina > DHKGBAAA Scott , Pamela > EIIBBAAA Atkins , Susan > FKAH Batiste , Ernest > GHMA Mitchell, Gregory > IAODBAAA Murray , Karen > IEOK Solomon , Clyde > IIBO Owens , David > IPDC Wallace , Eric > IPIM Hayward , Benjamin > JCIK Ramos , Donald > KFJE Roberts , Yvonne > KPGBBAAA Moore , > LCLABAAA Whitaker, Lettie > MGME Sharp , Michael > MIGBBAAA Montgomery , Jesenia > MPDK Lopez , Isabel > NEOM Powell , Linda > NKPC Shaffer , Sergio > NOCK Vargas , James > OGJEBAAA Owens , Denice > {noformat} > The issue is with the "concat" function in Spark SQL (also behaves the same > in Hive). When 'concat' meets any NULL string, it returns NULL as the answer. > But is this right? When I concatenate a person's last name and first name, if > the first name is missing (empty string or NULL), I should see the last name > still, not NULL, i.e., "Smith" + "" = "Smith", not NULL. > Simplest
[jira] [Created] (SPARK-15370) Some correlated subqueries return incorrect answers
Frederick Reiss created SPARK-15370: --- Summary: Some correlated subqueries return incorrect answers Key: SPARK-15370 URL: https://issues.apache.org/jira/browse/SPARK-15370 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.0 Reporter: Frederick Reiss The rewrite introduced in SPARK-14785 has the COUNT bug. The rewrite changes the semantics of some correlated subqueries when there are tuples from the outer query block that do not join with the subquery. For example: {noformat} spark-sql> create table R(a integer) as values (1); spark-sql> create table S(b integer); spark-sql> select R.a from R > where (select count(*) from S where R.a = S.b) = 0; Time taken: 2.139 seconds spark-sql> (returns zero rows; the answer should be one row of '1') {noformat} This problem also affects the SELECT clause: {noformat} spark-sql> select R.a, > (select count(*) from S where R.a = S.b) as cnt > from R; 1 NULL (the answer should be "1 0") {noformat} Some subqueries with COUNT aggregates are *not* affected: {noformat} spark-sql> select R.a from R > where (select count(*) from S where R.a = S.b) > 0; Time taken: 0.609 seconds spark-sql> (Correct answer) spark-sql> select R.a from R > where (select count(*) + sum(S.b) from S where R.a = S.b) = 0; Time taken: 0.553 seconds spark-sql> (Correct answer) {noformat} Other cases can trigger the variant of the COUNT bug for expressions involving NULL checks: {noformat} spark-sql> select R.a from R > where (select sum(S.b) is null from S where R.a = S.b); (returns zero rows, should return one row) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15122) TPC-DS Qury 41 fails with The correlated scalar subquery can only contain equality predicates
[ https://issues.apache.org/jira/browse/SPARK-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15271874#comment-15271874 ] Frederick Reiss commented on SPARK-15122: - In the official version of the query, the expression {{i_manufact = i1.i_manufact}} appears twice: once on either side of an {{OR}}. The optimizer needs to normalize the expression enough to factor that subexpression out of the two sides of the disjunction. Also, the error checking code in {{CheckAnalysis.scala}} that triggers the problem needs to trigger *after* that normalization. It looks like that check happens before the call to {{Optimizer.execute}}. > TPC-DS Qury 41 fails with The correlated scalar subquery can only contain > equality predicates > - > > Key: SPARK-15122 > URL: https://issues.apache.org/jira/browse/SPARK-15122 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: JESSE CHEN >Priority: Critical > > The official TPC-DS query 41 fails with the following error: > {noformat} > Error in query: The correlated scalar subquery can only contain equality > predicates: (((i_manufact#38 = i_manufact#16) && (i_category#36 = Women) > && ((i_color#41 = powder) || (i_color#41 = khaki))) && (((i_units#42 = Ounce) > || (i_units#42 = Oz)) && ((i_size#39 = medium) || (i_size#39 = extra > large || (((i_category#36 = Women) && ((i_color#41 = brown) || > (i_color#41 = honeydew))) && (((i_units#42 = Bunch) || (i_units#42 = Ton)) && > ((i_size#39 = N/A) || (i_size#39 = small) || i_category#36 = Men) && > ((i_color#41 = floral) || (i_color#41 = deep))) && (((i_units#42 = N/A) || > (i_units#42 = Dozen)) && ((i_size#39 = petite) || (i_size#39 = large || > (((i_category#36 = Men) && ((i_color#41 = light) || (i_color#41 = > cornflower))) && (((i_units#42 = Box) || (i_units#42 = Pound)) && ((i_size#39 > = medium) || (i_size#39 = extra large))) || ((i_manufact#38 = > i_manufact#16) && (i_category#36 = Women) && ((i_color#41 = midnight) || > (i_color#41 = snow))) && (((i_units#42 = Pallet) || (i_units#42 = Gross)) && > ((i_size#39 = medium) || (i_size#39 = extra large || (((i_category#36 = > Women) && ((i_color#41 = cyan) || (i_color#41 = papaya))) && (((i_units#42 = > Cup) || (i_units#42 = Dram)) && ((i_size#39 = N/A) || (i_size#39 = small) > || i_category#36 = Men) && ((i_color#41 = orange) || (i_color#41 = > frosted))) && (((i_units#42 = Each) || (i_units#42 = Tbl)) && ((i_size#39 = > petite) || (i_size#39 = large || (((i_category#36 = Men) && ((i_color#41 > = forest) || (i_color#41 = ghost))) && (((i_units#42 = Lb) || (i_units#42 = > Bundle)) && ((i_size#39 = medium) || (i_size#39 = extra large; > {noformat} > The output plans showed the following errors > {noformat} > == Parsed Logical Plan == > 'GlobalLimit 100 > +- 'LocalLimit 100 >+- 'Sort ['i_product_name ASC], true > +- 'Distinct > +- 'Project ['i_product_name] > +- 'Filter ((('i_manufact_id >= 738) && ('i_manufact_id <= (738 + > 40))) && (scalar-subquery#1 [] > 0)) >: +- 'SubqueryAlias scalar-subquery#1 [] >: +- 'Project ['count(1) AS item_cnt#0] >:+- 'Filter ((('i_manufact = 'i1.i_manufact) && > ('i_category = Women) && (('i_color = powder) || ('i_color = khaki))) && > ((('i_units = Ounce) || ('i_units = Oz)) && (('i_size = medium) || ('i_size = > extra large || ((('i_category = Women) && (('i_color = brown) || > ('i_color = honeydew))) && ((('i_units = Bunch) || ('i_units = Ton)) && > (('i_size = N/A) || ('i_size = small) || 'i_category = Men) && > (('i_color = floral) || ('i_color = deep))) && ((('i_units = N/A) || > ('i_units = Dozen)) && (('i_size = petite) || ('i_size = large || > ((('i_category = Men) && (('i_color = light) || ('i_color = cornflower))) && > ((('i_units = Box) || ('i_units = Pound)) && (('i_size = medium) || ('i_size > = extra large))) || (('i_manufact = 'i1.i_manufact) && ('i_category = > Women) && (('i_color = midnight) || ('i_color = snow))) && ((('i_units = > Pallet) || ('i_units = Gross)) && (('i_size = medium) || ('i_size = extra > large || ((('i_category = Women) && (('i_color = cyan) || ('i_color = > papaya))) && ((('i_units = Cup) || ('i_units = Dram)) && (('i_size = N/A) || > ('i_size = small) || 'i_category = Men) && (('i_color = orange) || > ('i_color = frosted))) && ((('i_units = Each) || ('i_units = Tbl)) && > (('i_size = petite) || ('i_size = large || ((('i_category = Men) && > (('i_color = forest) || ('i_color = ghost))) && ((('i_units = Lb) || > ('i_units = Bundle)) && (('i_size = medium) || ('i_size = extra large >
[jira] [Commented] (SPARK-14781) Support subquery in nested predicates
[ https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15265000#comment-15265000 ] Frederick Reiss commented on SPARK-14781: - Yeah, Distinct will impact performance for the uncorrelated case if the subquery returns more than a few million rows. That problem won't occur in the particular case of TPC-DS query 45 (the subquery there returns at most 500k rows at a 100TB scale factor), but you never know. And of course a Distinct after the join, as one would need to cover EXISTS, would see potentially billions of rows. I just figured I'd mention that possibility as an expedient that doesn't require any additional operators. I'd be up to adding a "LeftSemiPlus" mode to the various join operators if you'd prefer for implementation to start with that step. The new behavior is almost the same as the existing LeftSemi mode: one additional output column in the schema, plus code to emit rows with a null value when nothing on the inner matches an outer tuple. > Support subquery in nested predicates > - > > Key: SPARK-14781 > URL: https://issues.apache.org/jira/browse/SPARK-14781 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > Right now, we does not support nested IN/EXISTS subquery, for example > EXISTS( x1) OR EXISTS( x2) > In order to do that, we could use an internal-only join type SemiPlus, which > will output every row from left, plus additional column as the result of join > condition. Then we could replace the EXISTS() or IN() by the result column. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14781) Support subquery in nested predicates
[ https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15264789#comment-15264789 ] Frederick Reiss commented on SPARK-14781: - [~davies]: I think I have a minimally-invasive plan for covering Q45. *Existing code:* {{RewritePredicateSubquery.apply}} applies to conjunctions of predicates in the WHERE clause. When this rule finds an IN predicate with a subquery, the rule rewrites the IN predicate into a join. *Proposed change:* Modify {{RewritePredicateSubquery}} so that it also detects disjunctions (ORs) where exactly one child of the disjunction is an IN predicate with a non-correlated subquery. Rewrite each such disjunction into a left outer join, followed by a Filter. The inner (right) operand of the left outer join should be the subquery with an additional Distinct operator above it. The Filter will apply the remaining predicates from the disjunction to any tuples that did not join with the subquery. *Notes:* The Distinct here is needed because the in-list could contain duplicates. The Distinct could be eliminated if there was a join operator that combined the behavior of LeftOuter and LeftSemijoin. I suppose that's what SemiPlus will do? This approach could be extended to cover correlated IN/EXISTS subqueries. The rewrite would need to add unique IDs to the outer query's tuples before the join + filter, then remove duplicates after the join + filter. I'm *not* planning to do this extension in the first pass. The approach could also be extended to cover multiple subqueries inside a disjunction by chaining together multiple outer joins. I'm *not* planning to do this extension in the first pass. *Questions:* * Do you foresee any problems with this approach? * There is a second version of the IN/EXISTS subquery rewrite logic in PR #12720, but that code hasn't been merged yet. Would you prefer a diff against the current head; or a diff against the logic in PR 12720? > Support subquery in nested predicates > - > > Key: SPARK-14781 > URL: https://issues.apache.org/jira/browse/SPARK-14781 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > Right now, we does not support nested IN/EXISTS subquery, for example > EXISTS( x1) OR EXISTS( x2) > In order to do that, we could use an internal-only join type SemiPlus, which > will output every row from left, plus additional column as the result of join > condition. Then we could replace the EXISTS() or IN() by the result column. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14781) Support subquery in nested predicates
[ https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261474#comment-15261474 ] Frederick Reiss commented on SPARK-14781: - Sure, I'd be happy to put something together to cover Q45. Tomorrow is a mess, but I'll have time on Friday and Monday. > Support subquery in nested predicates > - > > Key: SPARK-14781 > URL: https://issues.apache.org/jira/browse/SPARK-14781 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > Right now, we does not support nested IN/EXISTS subquery, for example > EXISTS( x1) OR EXISTS( x2) > In order to do that, we could use an internal-only join type SemiPlus, which > will output every row from left, plus additional column as the result of join > condition. Then we could replace the EXISTS() or IN() by the result column. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14785) Support correlated scalar subquery
[ https://issues.apache.org/jira/browse/SPARK-14785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261157#comment-15261157 ] Frederick Reiss commented on SPARK-14785: - Note that the rewritten query in the example above needs an additional filter on {{t.b > t3.AVG}}. This rewrite is described in [a 1982 paper|https://pdfs.semanticscholar.org/e8ac/c9f63559a09d608e62c4061520c24d970c31.pdf]. This rewrite does not always give correct query results. In particular, the rewritten query may be missing results if the subquery contains a {{COUNT}} aggregate, or if the subquery sometimes returns {{NULL}} and the filtering predicate returns {{TRUE}} on a {{NULL}} input. See the chapter "Query Rewrite Optimization Rules in IBM DB2 Universal Database" in _Readings in Database Systems_ for more information. For the purpose of covering TPC-DS, this rewrite should work for queries 6, 32, 81, and 92. Those queries only use {{AVG}} aggregates in their subqueries. > Support correlated scalar subquery > -- > > Key: SPARK-14785 > URL: https://issues.apache.org/jira/browse/SPARK-14785 > Project: Spark > Issue Type: New Feature >Reporter: Davies Liu > > For example: > {code} > SELECT a from t where b > (select avg(c) from t2 where t.id = t2.id) > {code} > it could be rewritten as > {code} > SELECT a FROM t JOIN (SELECT id, AVG(c) FROM t2 GROUP by id) t3 ON t3.id = > t.id > {code} > TPCDS Q92, Q81, Q6 required this -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14781) Support subquery in nested predicates
[ https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261032#comment-15261032 ] Frederick Reiss commented on SPARK-14781: - [~davies] where is the definition of the SemiPlus operator? I don't see that anywhere in my copy of the main trunk. > Support subquery in nested predicates > - > > Key: SPARK-14781 > URL: https://issues.apache.org/jira/browse/SPARK-14781 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > Right now, we does not support nested IN/EXISTS subquery, for example > EXISTS( x1) OR EXISTS( x2) > In order to do that, we could use an internal-only join type SemiPlus, which > will output every row from left, plus additional column as the result of join > condition. Then we could replace the EXISTS() or IN() by the result column. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14781) Support subquery in nested predicates
[ https://issues.apache.org/jira/browse/SPARK-14781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15261028#comment-15261028 ] Frederick Reiss commented on SPARK-14781: - I'm not so sure about Q45. Here's the template for Q45: {noformat} [_LIMITA] select [_LIMITB] ca_zip, [GBOBC], sum(ws_sales_price) from web_sales, customer, customer_address, date_dim, item where ws_bill_customer_sk = c_customer_sk and c_current_addr_sk = ca_address_sk and ws_item_sk = i_item_sk and ( substr(ca_zip,1,5) in ('85669', '86197','88274','83405','86475', '85392', '85460', '80348', '81792') or i_item_id in (select i_item_id from item where i_item_sk in (2, 3, 5, 7, 11, 13, 17, 19, 23, 29) ) ) and ws_sold_date_sk = d_date_sk and d_qoy = [QOY] and d_year = [YEAR] group by ca_zip, [GBOBC] order by ca_zip, [GBOBC] [_LIMITC]; {noformat} This query does contain a subquery inside a disjunction ({{...or i_item_id in (select...}}), but that subquery is not correlated. What is needed there is for that subquery to be added to the list of noncorrelated subqueries evaluated in {{SparkPlan.waitForSubqueries()}} and a placeholder for those query results inserted into the plan. Q10 and Q35 have correlated EXISTS subqueries inside disjunctions. > Support subquery in nested predicates > - > > Key: SPARK-14781 > URL: https://issues.apache.org/jira/browse/SPARK-14781 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Davies Liu > > Right now, we does not support nested IN/EXISTS subquery, for example > EXISTS( x1) OR EXISTS( x2) > In order to do that, we could use an internal-only join type SemiPlus, which > will output every row from left, plus additional column as the result of join > condition. Then we could replace the EXISTS() or IN() by the result column. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10936) UDAF "mode" for categorical variables
[ https://issues.apache.org/jira/browse/SPARK-10936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949306#comment-14949306 ] Frederick Reiss commented on SPARK-10936: - Mode is not an algebraic aggregate. To find the mode in a single pass over the original data, one needs to track the full set of distinct values in the underlying set, as well as the number of times each value occurs in the records seen so far. For high-cardinality columns, this requirement will result in unbounded state. I can see three ways forward here: a) Stuff a hash table into the PartialAggregate2 API's result buffer, and hope that this buffer does not exhaust the heap, produce O(n^2) behavior when the column cardinality is high, or stop working on future (or present?) versions of codegen. b) Implement an approximate mode with fixed-size intermediate state (for example, a compressed reservoir sample), similar to how the current HyperLogLog++ aggregate works. Approximate computation of the mode will work well most of the time but will have unbounded error in corner cases. c) Add mode as another member of the "distinct" family of Spark aggregates, such as SUM/COUNT/AVERAGE DISTINCT. Use the same pre-Tungsten style of processing to handle mode for now. Create a follow-on JIRA to move mode over to the fast path at the same time that the other DISTINCT aggregates switch over. I think that (c) is the best option overall, but I'm happy to defer to others with deeper understanding. My thinking is that, while it would be good to have a mode aggregate available, mode is a relatively uncommon use case. Slow-path processing for mode is ok as a short-term expedient. Once SUM DISTINCT and related aggregates are fully moved onto the new framework, transitioning mode to the fast path should be easy. > UDAF "mode" for categorical variables > - > > Key: SPARK-10936 > URL: https://issues.apache.org/jira/browse/SPARK-10936 > Project: Spark > Issue Type: Sub-task > Components: ML, SQL >Reporter: Xiangrui Meng > > This is similar to frequent items except that we don't have a threshold on > the frequency. So an exact implementation might require a global shuffle. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6649) DataFrame created through SQLContext.jdbc() failed if columns table must be quoted
[ https://issues.apache.org/jira/browse/SPARK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14540967#comment-14540967 ] Frederick Reiss commented on SPARK-6649: Did some more digging through the code and edit history. It looks like the current version of the SQL parser uses the backtick character (`) as a delimiter. This change first appeared in SPARK-3483, with additional fixes done as part of SPARK-6898. I don't see any explanation for the use of a nonstandard quote character, and there doesn't seem to be any relevant discussion in the mailing list archives. I've only found two test cases under org.apache.spark.sql and child packages that use double quotes to delimit a string literal. In SQLQuerySuite.scala: {noformat} test(date row) { checkAnswer(sql( select cast(2015-01-28 as date) from testData limit 1), ... {noformat} And in TableScanSuite.scala: {noformat} test(SPARK-5196 schema field with comment) { sql( |CREATE TEMPORARY TABLE student(name string comment SN, age int comment SA, grade int) |USING org.apache.spark.sql.sources.AllDataTypesScanSource |OPTIONS ( | from '1', | to '10' |) ... {noformat} All the other test cases use single quotes per the SQL standard. I'm going to assume that the use of double quotes to delimit string literals was an oversight and make changes to correct that oversight. DataFrame created through SQLContext.jdbc() failed if columns table must be quoted -- Key: SPARK-6649 URL: https://issues.apache.org/jira/browse/SPARK-6649 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Frédéric Blanc Priority: Minor If I want to import the content a table from oracle, that contains a column with name COMMENT (a reserved keyword), I cannot use a DataFrame that map all the columns of this table. {code:title=ddl.sql|borderStyle=solid} CREATE TABLE TEST_TABLE ( COMMENT VARCHAR2(10) ); {code} {code:title=test.java|borderStyle=solid} SQLContext sqlContext = ... DataFrame df = sqlContext.jdbc(databaseURL, TEST_TABLE); df.rdd(); // = failed if the table contains a column with a reserved keyword {code} The same problem can be encounter if reserved keyword are used on table name. The JDBCRDD scala class could be improved, if the columnList initializer append the double-quote for each column. (line : 225) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6649) DataFrame created through SQLContext.jdbc() failed if columns table must be quoted
[ https://issues.apache.org/jira/browse/SPARK-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527738#comment-14527738 ] Frederick Reiss commented on SPARK-6649: I was able to reproduce this problem on Spark 1.3.1. The Spark SQL lexical analyzer treats anything enclosed in double quotes as a string. The Spark SQL parser only allows strings to be treated as literals, not identifiers. On the spark-sql command line: {noformat} spark-sql select hello as hello, 'world' as world; hello world Time taken: 0.125 seconds, Fetched 1 row(s) spark-sql select hello as hello, 'world' as world; 15/05/04 18:03:05 ERROR SparkSQLDriver: Failed in [select hello as hello, 'world' as world] org.apache.spark.sql.AnalysisException: cannot recognize input near 'as' 'hello' ',' in selection target; line 1 pos 18 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:254) [many lines of stack trace] {noformat} The same thing happens from the Spark Scala shell: {noformat} scala val df = sqlContext.sql(select \hello\ as hello, 'world' as world) df: org.apache.spark.sql.DataFrame = [hello: string, world: string] scala val df2 = sqlContext.sql(select \hello\ as \hello\, 'world' as \world\) org.apache.spark.sql.AnalysisException: cannot recognize input near 'as' 'hello' ',' in selection target; line 1 pos 18 at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:254) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41) [many lines of stack trace] {noformat} This behavior is not consistent with the SQL standard, though I suppose it is somewhat consistent with MySQL's default behavior. According to the grammar in the SQL-92 document ([http://www.contrib.andrew.cmu.edu/~shadow/sql/sql1992.txt], strings should be delimited by single quotes: {noformat} character string literal ::= [ introducercharacter set specification ] quote [ character representation... ] quote [ { separator... quote [ character representation... ] quote }... ] ... national character string literal ::= N quote [ character representation... ] quote [ { separator... quote [ character representation... ] quote }... ] bit string literal ::= B quote [ bit... ] quote [ { separator... quote [ bit... ] quote }... ] hex string literal ::= X quote [ hexit... ] quote [ { separator... quote [ hexit... ] quote }... ] ... quote ::= ' {noformat} and identifiers *may* be delimited with double quotes: {noformat} delimited identifier ::= double quote delimited identifier body double quote ... double quote ::= {noformat} Thoughts? Are there any pull requests in flight that fix this problem already? DataFrame created through SQLContext.jdbc() failed if columns table must be quoted -- Key: SPARK-6649 URL: https://issues.apache.org/jira/browse/SPARK-6649 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Frédéric Blanc Priority: Minor If I want to import the content a table from oracle, that contains a column with name COMMENT (a reserved keyword), I cannot use a DataFrame that map all the columns of this table. {code:title=ddl.sql|borderStyle=solid} CREATE TABLE TEST_TABLE ( COMMENT VARCHAR2(10) ); {code} {code:title=test.java|borderStyle=solid} SQLContext sqlContext = ... DataFrame df = sqlContext.jdbc(databaseURL, TEST_TABLE); df.rdd(); // = failed if the table contains a column with a reserved keyword {code} The same problem can be encounter if reserved keyword are used on table name. The JDBCRDD scala class could be improved, if the columnList initializer append the double-quote for each column. (line : 225) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7043) KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line
[ https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527683#comment-14527683 ] Frederick Reiss commented on SPARK-7043: Both issues appear to have the same root cause: The interpreter's rewrites create a class with a transient reference to the Spark context, and Kryo creates a second context during deserialization. KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line -- Key: SPARK-7043 URL: https://issues.apache.org/jira/browse/SPARK-7043 Project: Spark Issue Type: Bug Components: Spark Shell Affects Versions: 1.3.1 Environment: Ubuntu 14.04, no hadoop Reporter: Peng Cheng Priority: Minor Labels: classloader, kryo Original Estimate: 48h Remaining Estimate: 48h When deploying Spark-shell with spark.serializer=org.apache.spark.serializer.KryoSerializer option. Spark-shell cannot execute the following code (in 1 line): case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 10).collect() This problem won't exist for either JavaSerializer or code splitted into 2 lines. The only possible explanation is that KryoSerializer is using a ClassLoader that is not registered as an subsidiary ClassLoader of the one in REPL. A dirty fix would be just breaking input by semicolon, but its better to fix the ClassLoader to avoid other liabilities. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7043) KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line
[ https://issues.apache.org/jira/browse/SPARK-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14527575#comment-14527575 ] Frederick Reiss commented on SPARK-7043: I did some looking into this issue. Here's a summary of what I've found: The interpreter rewrites each command into a pair of generated classes, one for the read phase and one for the eval phase of executing the statement. For the command: {{case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()}} this rewrite process produces the following class (simplified here by removing empty constructors) to cover the read phase: {noformat} class $read extends Serializable { class $iwC extends Serializable { val $VAL10 = $line3.$read.INSTANCE; import $VAL10.$iw.$iw.sc; class $iwC extends Serializable { case class Foo extends scala.Product with scala.Serializable { caseaccessor paramaccessor val i: Int = _; }; val ret = sc.parallelize(1.to(100).map(Foo), 10).collect() }; val $iw = new $iwC() }; val $iw = new $iwC() } {noformat} Note the lines {{val $VAL10 = $line3.$read.INSTANCE;}} {{import $VAL10.$iw.$iw.sc;}} These lines pull the symbol {{sc}} into the scope of the class {{$read.$iwC}}. They also happen to give the class {{$read.$iwC}} a field of type {{$line3.$read.$iwC}}. I haven't been able to get at the definition of the generated class or object {{$line3}}, but it appears that {{$line3}} is a rewritten version of the Scala code {noformat} @transient val sc = { val _sc = org.apache.spark.repl.Main.interp.createSparkContext() println(Spark context available as sc.) _sc } {noformat} which runs whenever the interpreter starts up. The generated code for the current line gets further bundled inside a generated object or class (I'm not sure which) that represents the current line's state transitions. In the trace I'm looking at, this object/class is called {{$line19}}. As a result, the case class {{Foo}} from the command line turns into the case class {{$line19.$read.$iwC.$iwC.Foo}}, which the Scala compiler turns into a Java inner class by the same name. The call to {{sc.parallelize(1.to(100).map(Foo), 10)}} causes Spark to serialize an array of instances of {{$line19.$read.$iwC.$iwC.Foo}}. The serializer serializes each of these inner class objects, along with their parent class objects. During this serialization process, the serializer skips the pointer to the SparkContext, since that pointer is marked as transient. Inside of the call to {{collect()}}, the deserializer reconstructs instances of {{$line19.$read.$iwC.$iwC.Foo}}. Before creating such an instance, the deserializer needs to create an enclosing instance of {{$line19.$read.$iwC.$iwC}}, which needs an enclosing instance of {{$line19.$read.$iwC}}. The class {{$line19.$read.$iwC}} contains a field ({{val $VAL10 = $line3.$read.INSTANCE}}) that has a pointer to a field of an object of type {{$line3.$read.$iwC}}. The type {{$line3.$read.$iwC}} in turn has a transient field called sc somewhere underneath it. At some point during initializing the object of type {{$line3.$read.$iwC}}, the Kryo deserializer makes a call to the static initializer for the transient field sc. This static initializer calls {{org.apache.spark.repl.Main.interp.createSparkContext()}}, which attempts and fails to create a second SparkContext. The deserialization process fails with an exception. I'm not sure exactly why the Java deserializer doesn't crash in the same way. I suspect that, when deserializing the transient field sc, Java's deserializer sets that field to null and moves on. I'm not sure which way of initializing the transient field is correct. The serialization spec ([http://docs.oracle.com/javase/7/docs/platform/serialization/spec/serialTOC.html]) is vague about default values for transient fields. KryoSerializer cannot be used with REPL to interpret code in which case class definition and its shipping are in the same line -- Key: SPARK-7043 URL: https://issues.apache.org/jira/browse/SPARK-7043 Project: Spark Issue Type: Bug Components: Spark Shell Affects Versions: 1.3.1 Environment: Ubuntu 14.04, no hadoop Reporter: Peng Cheng Priority: Minor Labels: classloader, kryo Original Estimate: 48h Remaining Estimate: 48h When deploying Spark-shell with spark.serializer=org.apache.spark.serializer.KryoSerializer option. Spark-shell cannot execute the following code (in 1 line): case class Foo(i: Int);val ret = sc.parallelize((1 to 100).map(Foo), 10).collect()
[jira] [Commented] (SPARK-7273) The SQLContext.jsonFile() api has a problem when load a format json file?
[ https://issues.apache.org/jira/browse/SPARK-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14523384#comment-14523384 ] Frederick Reiss commented on SPARK-7273: The error in the description indicates that there is a character in the middle of the first line of the JSON file that TextInputFormat treats as a line separator. Spark sees the JSON content as I can think of two potential causes: a) Steven's JSON content has run through a pretty-printing function, and there is a newline character between the two parts of the JSON object, or b) Steven's local Hadoop/YARN configuration has a nonstandard setting for textinputformat.record.delimiter [~jiege]: Can you share a copy of your JSON file? Technical details: SQLContext.jsonFile() makes a call to org.apache.spark.sql.json.DefaultSource, which delegates the task to org.apache.spark.sql.json.JSONRelation, which uses SparkContext.textFile() to open the JSON file. SparkContext.textFile() uses TextInputFormat to read the file. The SQLContext.jsonFile() api has a problem when load a format json file? - Key: SPARK-7273 URL: https://issues.apache.org/jira/browse/SPARK-7273 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.1 Reporter: steven Priority: Minor my code as follow: val df = sqlContext.jsonFile(test.json); test.json content is: { name: steven, age : 20 } the jsonFile invoke will get a Exception as follow: java.lang.RuntimeException: Failed to parse record age : 20}. Please make sure that each line of the file (or each string in the RDD) is a valid JSON object or an array of JSON objects. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:313) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:307) is it a bug? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org