[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally
[ https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-37536: Description: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for microservices. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations - ShuffleExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plans unchanged if the value is false. Looking forward any comments and feedback. was: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for microservices. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plans unchanged if the value is false. Looking forward any comments and feedback. > Allow for API user to disable Shuffle Operations while running locally > -- > > Key: SPARK-37536 > URL: https://issues.apache.org/jira/browse/SPARK-37536 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 > Environment: Spark running in local mode >Reporter: Rodrigo Boavida >Priority: Major > > We have been using Spark on local mode, as a small embedded, in-memory SQL DB > for microservices. > Spark's powerful SQL features, and flexibility enables developers to build > efficient data querying solutions. Due to the nature of our solution dealing > with small datasets, which required to be queried through SQL, on very low > latencies, we found the embedded approach a very good model. > We found through experimentation, that Spark on local mode, would gain > significant performance improvements (on average between 20-30%) by disabling > the shuffling on aggregation operations - ShuffleExchangeExec. > I will be raising a PR, to propose introducing a new configuration variable > *spark.sql.localMode.shuffle.enabled* > This variable will default to true, and will be checked on the QueryExecution > EnsureRequirements creation time, in conjunction with checking if Spark is > running on local mode, will keep the execution plans unchanged if the value > is false. > Looking forward any comments and feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally
[ https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-37536: Description: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for microservices. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plans unchanged if the value is false. Looking forward any comments and feedback. was: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for microservices. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. > Allow for API user to disable Shuffle Operations while running locally > -- > > Key: SPARK-37536 > URL: https://issues.apache.org/jira/browse/SPARK-37536 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 > Environment: Spark running in local mode >Reporter: Rodrigo Boavida >Priority: Major > > We have been using Spark on local mode, as a small embedded, in-memory SQL DB > for microservices. > Spark's powerful SQL features, and flexibility enables developers to build > efficient data querying solutions. Due to the nature of our solution dealing > with small datasets, which required to be queried through SQL, on very low > latencies, we found the embedded approach a very good model. > We found through experimentation, that Spark on local mode, would gain > significant performance improvements (on average between 20-30%) by disabling > the shuffling on aggregation operations. This is done by expanding the query > execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. > I will be raising a PR, to propose introducing a new configuration variable > *spark.sql.localMode.shuffle.enabled* > This variable will default to true, and will be checked on the QueryExecution > EnsureRequirements creation time, in conjunction with checking if Spark is > running on local mode, will keep the execution plans unchanged if the value > is false. > Looking forward any comments and feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally
[ https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-37536: Description: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for microservices. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. was: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for our microservice. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. > Allow for API user to disable Shuffle Operations while running locally > -- > > Key: SPARK-37536 > URL: https://issues.apache.org/jira/browse/SPARK-37536 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 > Environment: Spark running in local mode >Reporter: Rodrigo Boavida >Priority: Major > > We have been using Spark on local mode, as a small embedded, in-memory SQL DB > for microservices. > Spark's powerful SQL features, and flexibility enables developers to build > efficient data querying solutions. Due to the nature of our solution dealing > with small datasets, which required to be queried through SQL, on very low > latencies, we found the embedded approach a very good model. > We found through experimentation, that Spark on local mode, would gain > significant performance improvements (on average between 20-30%) by disabling > the shuffling on aggregation operations. This is done by expanding the query > execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. > I will be raising a PR, to propose introducing a new configuration variable > *spark.sql.localMode.shuffle.enabled* > This variable will default to true, and will be checked on the QueryExecution > EnsureRequirements creation time, in conjunction with checking if Spark is > running on local mode, will keep the execution plan unchanged if the value is > false. > Looking forward any comments and feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally
[ https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-37536: Description: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for our microservice. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.sql.localMode.shuffle.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. was: We have been using Spark on local mode, as a small embedded, in-memory SQL DB for our microservice. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.shuffle.local.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. > Allow for API user to disable Shuffle Operations while running locally > -- > > Key: SPARK-37536 > URL: https://issues.apache.org/jira/browse/SPARK-37536 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.3.0 > Environment: Spark running in local mode >Reporter: Rodrigo Boavida >Priority: Major > > We have been using Spark on local mode, as a small embedded, in-memory SQL DB > for our microservice. > Spark's powerful SQL features, and flexibility enables developers to build > efficient data querying solutions. Due to the nature of our solution dealing > with small datasets, which required to be queried through SQL, on very low > latencies, we found the embedded approach a very good model. > We found through experimentation, that Spark on local mode, would gain > significant performance improvements (on average between 20-30%) by disabling > the shuffling on aggregation operations. This is done by expanding the query > execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. > I will be raising a PR, to propose introducing a new configuration variable > *spark.sql.localMode.shuffle.enabled* > This variable will default to true, and will be checked on the QueryExecution > EnsureRequirements creation time, in conjunction with checking if Spark is > running on local mode, will keep the execution plan unchanged if the value is > false. > Looking forward any comments and feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36986) Improving schema filtering flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Description: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found an optimization would improve greatly Spark's flexibility to query external schema management. Scope: ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ val field = nameToField.get(name) if(field.isDefined) { Some((fieldIndex(name), field.get)) } else \{ None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposal. was: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. Scope: ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ val field = nameToField.get(name) if(field.isDefined) \{ Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposal. > Improving schema filtering flexibility > -- > > Key: SPARK-36986 > URL: https://issues.apache.org/jira/browse/SPARK-36986 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Rodrigo Boavida >Priority: Major > > Our spark usage, requires us to build an external schema and pass it on while > creating a DataSet. > While working through this, I found an optimization would improve greatly > Spark's flexibility to query external schema management. > Scope: ability to retrieve a field's name and schema in one single call, > requesting to return a tupple by index. > Means extending the StructType class to support an additional method > This is what the function would look like: > /** > * Returns the index and field structure by name. > * If it doesn't find it, returns None. > * Avoids two client calls/loops to obtain consolidated field info. > * > */ > def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ > val field = nameToField.get(name) if(field.isDefined) { > Some((fieldIndex(name), field.get)) } > else \{ None } > } > This is particularly useful from an efficiency perspective, when we're > parsing a Json structure and we want to check for every field what is the > name and field type already defined in the schema > I will create a corresponding branch for PR review, assuming that there are > no concerns with the above proposal. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36986) Improving schema filtering flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Summary: Improving schema filtering flexibility (was: Improving external schema management flexibility) > Improving schema filtering flexibility > -- > > Key: SPARK-36986 > URL: https://issues.apache.org/jira/browse/SPARK-36986 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Rodrigo Boavida >Priority: Major > > Our spark usage, requires us to build an external schema and pass it on while > creating a DataSet. > While working through this, I found a couple of optimizations would improve > greatly Spark's flexibility to handle external schema management. > Scope: ability to retrieve a field's name and schema in one single call, > requesting to return a tupple by index. > Means extending the StructType class to support an additional method > This is what the function would look like: > /** > * Returns the index and field structure by name. > * If it doesn't find it, returns None. > * Avoids two client calls/loops to obtain consolidated field info. > * > */ > def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ > val field = nameToField.get(name) if(field.isDefined) \{ > Some((fieldIndex(name), field.get)) } > else > { None } > } > This is particularly useful from an efficiency perspective, when we're > parsing a Json structure and we want to check for every field what is the > name and field type already defined in the schema > I will create a corresponding branch for PR review, assuming that there are > no concerns with the above proposal. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36986) Improving external schema management flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Docs Text: Schema management improvements 1 - Retrieving a field name and type from a schema based on its index was: Schema management improvements 1 - Retrieving a field name and type from a schema based on its index 2 - Allowing external dataSet schemas to be provided as well their external generated rows. > Improving external schema management flexibility > > > Key: SPARK-36986 > URL: https://issues.apache.org/jira/browse/SPARK-36986 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Rodrigo Boavida >Priority: Major > > Our spark usage, requires us to build an external schema and pass it on while > creating a DataSet. > While working through this, I found a couple of optimizations would improve > greatly Spark's flexibility to handle external schema management. > Scope: ability to retrieve a field's name and schema in one single call, > requesting to return a tupple by index. > Means extending the StructType class to support an additional method > This is what the function would look like: > /** > * Returns the index and field structure by name. > * If it doesn't find it, returns None. > * Avoids two client calls/loops to obtain consolidated field info. > * > */ > def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ > val field = nameToField.get(name) if(field.isDefined) \{ > Some((fieldIndex(name), field.get)) } > else > { None } > } > This is particularly useful from an efficiency perspective, when we're > parsing a Json structure and we want to check for every field what is the > name and field type already defined in the schema > I will create a corresponding branch for PR review, assuming that there are > no concerns with the above proposal. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36986) Improving external schema management flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Description: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. Scope: ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{ val field = nameToField.get(name) if(field.isDefined) \{ Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposal. was: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. 1 - ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = { val field = nameToField.get(name) if(field.isDefined) \{ Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema 2 - Allowing for a dataset to be created from a schema, and passing the corresponding internal rows which the internal types map with the schema already defined externally. This allows to create Spark fields based on any data structure, without depending on Spark's internal conversions (in particular for Json parsing), and improves performance by skipping the CatalystConverts job of converting native Java types into Spark types. This is what the function would look like: /** * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This method allows * the caller to create externally the InternalRow set, as we as define the schema externally. * * @since 3.3.0 */ def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = \{ val attributes = schema.toAttributes val plan = LogicalRDD(attributes, data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed() new Dataset[Row](this, plan, RowEncoder(schema)) } This is similar to this function: def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame But doesn't depend on Spark internally creating the RDD based by inferring for example from a Json structure. Which is not useful if we're managing the schema externally. Also skips the Catalyst conversions and corresponding object overhead, making the internal rows generation much more efficient, by being done explicitly from the caller. I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposals. > Improving external schema management flexibility > > > Key: SPARK-36986 > URL: https://issues.apache.org/jira/browse/SPARK-36986 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Rodrigo Boavida >Priority: Major > > Our spark usage, requires us to build an external schema and pass it on while > creating a DataSet. > While working through this, I found a couple of optimizations would improve > greatly Spark's flexibility to handle external schema management. > Scope: ability to retrieve a field's name and schema in one single call, > requesting to return a tupple by index. > Means extending the StructType class to support an additional method > This is what the function would look like: > /** > * Returns the index and field structure by name. > * If it doesn't find it, returns None. >
[jira] [Updated] (SPARK-36986) Improving external schema management flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Priority: Major (was: Minor) > Improving external schema management flexibility > > > Key: SPARK-36986 > URL: https://issues.apache.org/jira/browse/SPARK-36986 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: Rodrigo Boavida >Priority: Major > > Our spark usage, requires us to build an external schema and pass it on while > creating a DataSet. > While working through this, I found a couple of optimizations would improve > greatly Spark's flexibility to handle external schema management. > 1 - ability to retrieve a field's name and schema in one single call, > requesting to return a tupple by index. > Means extending the StructType class to support an additional method > This is what the function would look like: > /** > * Returns the index and field structure by name. > * If it doesn't find it, returns None. > * Avoids two client calls/loops to obtain consolidated field info. > * > */ > def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = { > val field = nameToField.get(name) > if(field.isDefined) \{ Some((fieldIndex(name), field.get)) } > else > { None } > } > This is particularly useful from an efficiency perspective, when we're > parsing a Json structure and we want to check for every field what is the > name and field type already defined in the schema > > 2 - Allowing for a dataset to be created from a schema, and passing the > corresponding internal rows which the internal types map with the schema > already defined externally. This allows to create Spark fields based on any > data structure, without depending on Spark's internal conversions (in > particular for Json parsing), and improves performance by skipping the > CatalystConverts job of converting native Java types into Spark types. > This is what the function would look like: > > /** > * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This > method allows > * the caller to create externally the InternalRow set, as we as define the > schema externally. > * > * @since 3.3.0 > */ > def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = > \{ val attributes = schema.toAttributes val plan = LogicalRDD(attributes, > data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed() > new Dataset[Row](this, plan, RowEncoder(schema)) } > > This is similar to this function: > def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame > But doesn't depend on Spark internally creating the RDD based by inferring > for example from a Json structure. Which is not useful if we're managing the > schema externally. > Also skips the Catalyst conversions and corresponding object overhead, making > the internal rows generation much more efficient, by being done explicitly > from the caller. > > I will create a corresponding branch for PR review, assuming that there are > no concerns with the above proposals. > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally
Rodrigo Boavida created SPARK-37536: --- Summary: Allow for API user to disable Shuffle Operations while running locally Key: SPARK-37536 URL: https://issues.apache.org/jira/browse/SPARK-37536 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.3.0 Environment: Spark running in local mode Reporter: Rodrigo Boavida We have been using Spark on local mode, as a small embedded, in-memory SQL DB for our microservice. Spark's powerful SQL features, and flexibility enables developers to build efficient data querying solutions. Due to the nature of our solution dealing with small datasets, which required to be queried through SQL, on very low latencies, we found the embedded approach a very good model. We found through experimentation, that Spark on local mode, would gain significant performance improvements (on average between 20-30%) by disabling the shuffling on aggregation operations. This is done by expanding the query execution plan with ShuffleExchangeExec or the BroadcastExchangeExec. I will be raising a PR, to propose introducing a new configuration variable *spark.shuffle.local.enabled* This variable will default to true, and will be checked on the QueryExecution EnsureRequirements creation time, in conjunction with checking if Spark is running on local mode, will keep the execution plan unchanged if the value is false. Looking forward any comments and feedback. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-36986) Improving external schema management flexibility
[ https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rodrigo Boavida updated SPARK-36986: Description: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. 1 - ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = { val field = nameToField.get(name) if(field.isDefined) \{ Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema 2 - Allowing for a dataset to be created from a schema, and passing the corresponding internal rows which the internal types map with the schema already defined externally. This allows to create Spark fields based on any data structure, without depending on Spark's internal conversions (in particular for Json parsing), and improves performance by skipping the CatalystConverts job of converting native Java types into Spark types. This is what the function would look like: /** * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This method allows * the caller to create externally the InternalRow set, as we as define the schema externally. * * @since 3.3.0 */ def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = \{ val attributes = schema.toAttributes val plan = LogicalRDD(attributes, data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed() new Dataset[Row](this, plan, RowEncoder(schema)) } This is similar to this function: def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame But doesn't depend on Spark internally creating the RDD based by inferring for example from a Json structure. Which is not useful if we're managing the schema externally. Also skips the Catalyst conversions and corresponding object overhead, making the internal rows generation much more efficient, by being done explicitly from the caller. I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposals. was: Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. 1 - ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = { val field = nameToField.get(name) if(field.isDefined) { Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema 2 - Allowing for a dataset to be created from a schema, and passing the corresponding internal rows which the internal types map with the schema already defined externally. this greatly improves the performance by skipping This is what the function would look like: /** * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This method allows * the caller to create externally the InternalRow set, as we as define the schema externally. * * @since 3.3.0 */ def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = { val attributes = schema.toAttributes val plan = LogicalRDD(attributes, data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed() new Dataset[Row](this, plan, RowEncoder(schema)) } This is similar to this function: def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame But doesn't depend on Spark internally creating the RDD based by inferring for example from a Json structure. Which is not useful if we're managing the schema externally. Also skips the Catalyst conversions and
[jira] [Created] (SPARK-36986) Improving external schema management flexibility
Rodrigo Boavida created SPARK-36986: --- Summary: Improving external schema management flexibility Key: SPARK-36986 URL: https://issues.apache.org/jira/browse/SPARK-36986 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: Rodrigo Boavida Our spark usage, requires us to build an external schema and pass it on while creating a DataSet. While working through this, I found a couple of optimizations would improve greatly Spark's flexibility to handle external schema management. 1 - ability to retrieve a field's name and schema in one single call, requesting to return a tupple by index. Means extending the StructType class to support an additional method This is what the function would look like: /** * Returns the index and field structure by name. * If it doesn't find it, returns None. * Avoids two client calls/loops to obtain consolidated field info. * */ def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = { val field = nameToField.get(name) if(field.isDefined) { Some((fieldIndex(name), field.get)) } else { None } } This is particularly useful from an efficiency perspective, when we're parsing a Json structure and we want to check for every field what is the name and field type already defined in the schema 2 - Allowing for a dataset to be created from a schema, and passing the corresponding internal rows which the internal types map with the schema already defined externally. this greatly improves the performance by skipping This is what the function would look like: /** * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This method allows * the caller to create externally the InternalRow set, as we as define the schema externally. * * @since 3.3.0 */ def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = { val attributes = schema.toAttributes val plan = LogicalRDD(attributes, data)(self) val qe = sessionState.executePlan(plan) qe.assertAnalyzed() new Dataset[Row](this, plan, RowEncoder(schema)) } This is similar to this function: def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame But doesn't depend on Spark internally creating the RDD based by inferring for example from a Json structure. Which is not useful if we're managing the schema externally. Also skips the Catalyst conversions and corresponding object overhead, making the internal rows generation much more efficient, by being done explicitly from the caller. I will create a corresponding branch for PR review, assuming that there are no concerns with the above proposals. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2629) Improved state management for Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117404#comment-15117404 ] Rodrigo Boavida commented on SPARK-2629: Hi, I've experimented the new API method but am struggling to find an option to have the updateStateByKey behavior which forces all elements to be recomputed on every batch (this is a requirement for my application as I am calculating duration fields based on the streaming intervals and updating external store for every element). Seems this function does not allow to compute all elements as an option. Seems by design judging by the design doc. Could someone please clarify? tnks, Rod > Improved state management for Spark Streaming > - > > Key: SPARK-2629 > URL: https://issues.apache.org/jira/browse/SPARK-2629 > Project: Spark > Issue Type: Epic > Components: Streaming >Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1 >Reporter: Tathagata Das >Assignee: Tathagata Das > > Current updateStateByKey provides stateful processing in Spark Streaming. It > allows the user to maintain per-key state and manage that state using an > updateFunction. The updateFunction is called for each key, and it uses new > data and existing state of the key, to generate an updated state. However, > based on community feedback, we have learnt the following lessons. > - Need for more optimized state management that does not scan every key > - Need to make it easier to implement common use cases - (a) timeout of idle > data, (b) returning items other than state > The high level idea that I am proposing is > - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user > to update per-key state, and emit arbitrary records. The new API is necessary > as this will have significantly different semantics than the existing > updateStateByKey API. This API will have direct support for timeouts. > - Internally, the system will keep the state data as a map/list within the > partitions of the state RDDs. The new data RDDs will be partitioned > appropriately, and for all the key-value data, it will lookup the map/list in > the state RDD partition and create a new list/map of updated state data. The > new state RDD partition will be created based on the update data and if > necessary, with old data. > Here is the detailed design doc (*outdated, to be updated*). Please take a > look and provide feedback as comments. > https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
[ https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15058083#comment-15058083 ] Rodrigo Boavida commented on SPARK-12219: - Just got everything running perfectly on the cluster on latest 1.6 SNAPHOST. I will start using 1.6 and above. Thanks for the help checking it. Tnks, Rod > Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly > - > > Key: SPARK-12219 > URL: https://issues.apache.org/jira/browse/SPARK-12219 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.5.2 >Reporter: Rodrigo Boavida > > I've tried with no success to build Spark on Scala 2.11.7. I'm getting build > errors using sbt due to the issues found in the below thread in July of this > year. > https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E > Seems some minor fixes are needed to make the Scala 2.11 compiler happy. > I needed to build with SBT as per suggested on below thread to get over some > apparent maven shader plugin because which changed some classes when I change > to akka 2.4.0. > https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU > I've set this bug to Major priority assuming that the Spark community wants > to keep fully supporting SBT builds, including the Scala 2.11 compatibility. > Tnks, > Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams
[ https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15058121#comment-15058121 ] Rodrigo Boavida commented on SPARK-12178: - I plan onto to make my akka direct stream implementation open sourced - but this would be absolutely necessary to have it complete. I heard there is someone working on a flume based implementation of direct stream and I'm sure other streaming engines will follow soon. Is there something I could do to push this forward? I don't mind being the one doing the change. Tnks, Rod > Expose reporting of StreamInputInfo for custom made streams > --- > > Key: SPARK-12178 > URL: https://issues.apache.org/jira/browse/SPARK-12178 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Rodrigo Boavida >Priority: Minor > > For custom made direct streams, the Spark Streaming context needs to be > informed of the RDD count per batch execution. This is not exposed by the > InputDStream abstract class. > The suggestion is to create a method in the InputDStream class that reports > to the streaming context and make that available to child classes of > InputDStream. > Signature example: > def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : > org.apache.spark.streaming.scheduler.StreamInputInfo) > I have already done this on my own private branch. I can merge that change in > if approval is given. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
[ https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15057133#comment-15057133 ] Rodrigo Boavida commented on SPARK-12219: - [~srowen] Just built and ran successfully off the cluster the 1.6 branch. Going to run on cluster tomorrow just to double check runtime is in good conditions as well, and will let you know. Tnks, Rod > Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly > - > > Key: SPARK-12219 > URL: https://issues.apache.org/jira/browse/SPARK-12219 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.5.2 >Reporter: Rodrigo Boavida > > I've tried with no success to build Spark on Scala 2.11.7. I'm getting build > errors using sbt due to the issues found in the below thread in July of this > year. > https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E > Seems some minor fixes are needed to make the Scala 2.11 compiler happy. > I needed to build with SBT as per suggested on below thread to get over some > apparent maven shader plugin because which changed some classes when I change > to akka 2.4.0. > https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU > I've set this bug to Major priority assuming that the Spark community wants > to keep fully supporting SBT builds, including the Scala 2.11 compatibility. > Tnks, > Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
[ https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15056180#comment-15056180 ] Rodrigo Boavida commented on SPARK-12219: - Sean - that's great news. Unfortunately I haven't had time to check on the errors. I will get a latest copy of 1.6, build it again and let you know. Cheers, Rod > Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly > - > > Key: SPARK-12219 > URL: https://issues.apache.org/jira/browse/SPARK-12219 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.5.2 >Reporter: Rodrigo Boavida > > I've tried with no success to build Spark on Scala 2.11.7. I'm getting build > errors using sbt due to the issues found in the below thread in July of this > year. > https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E > Seems some minor fixes are needed to make the Scala 2.11 compiler happy. > I needed to build with SBT as per suggested on below thread to get over some > apparent maven shader plugin because which changed some classes when I change > to akka 2.4.0. > https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU > I've set this bug to Major priority assuming that the Spark community wants > to keep fully supporting SBT builds, including the Scala 2.11 compatibility. > Tnks, > Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
[ https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049168#comment-15049168 ] Rodrigo Boavida commented on SPARK-12219: - I could look into that tomorrow morning (my day is ending over in Ireland :)). Could you give me some hints on how to proceed? Would be my first contribution to Spark. Tnks, Rod > Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly > - > > Key: SPARK-12219 > URL: https://issues.apache.org/jira/browse/SPARK-12219 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.5.2 >Reporter: Rodrigo Boavida > > I've tried with no success to build Spark on Scala 2.11.7. I'm getting build > errors using sbt due to the issues found in the below thread in July of this > year. > https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E > Seems some minor fixes are needed to make the Scala 2.11 compiler happy. > I needed to build with SBT as per suggested on below thread to get over some > apparent maven shader plugin because which changed some classes when I change > to akka 2.4.0. > https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU > I've set this bug to Major priority assuming that the Spark community wants > to keep fully supporting SBT builds, including the Scala 2.11 compatibility. > Tnks, > Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
[ https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049044#comment-15049044 ] Rodrigo Boavida commented on SPARK-12219: - Hi Sean, I've proceed as instructed per documentation. Applied the following script 2.11.7 Also changed the Scala version in the main pom.xml to 2.11.7 which it is not done by the script 2.11.7 The errors I get when I run: "build/sbt -Pyarn -Phadoop-2.3 -Dscala-2.11 assembly" [error] /home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala:40: no valid targets for annotation on value rdd - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [error] @transient rdd: RDD[T], [error] [error] /home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala:42: no valid targets for annotation on value parentRddPartitionIndex - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [error] @transient parentRddPartitionIndex: Int) [error] [error] /home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:35: no valid targets for annotation on value partitionFilterFunc - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [error] private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) [error] [error] /home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:58: no valid targets for annotation on value prev - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [error] @transient prev: RDD[T], [error] [error] /home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:59: no valid targets for annotation on value partitionFilterFunc - it is discarded unused. You may specify targets with meta-annotations, e.g. @(transient @param) [error] @transient partitionFilterFunc: Int => Boolean) > Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly > - > > Key: SPARK-12219 > URL: https://issues.apache.org/jira/browse/SPARK-12219 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 1.5.2 >Reporter: Rodrigo Boavida > > I've tried with no success to build Spark on Scala 2.11.7. I'm getting build > errors using sbt due to the issues found in the below thread in July of this > year. > https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E > Seems some minor fixes are needed to make the Scala 2.11 compiler happy. > I needed to build with SBT as per suggested on below thread to get over some > apparent maven shader plugin because which changed some classes when I change > to akka 2.4.0. > https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU > I've set this bug to Major priority assuming that the Spark community wants > to keep fully supporting SBT builds, including the Scala 2.11 compatibility. > Tnks, > Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
Rodrigo Boavida created SPARK-12219: --- Summary: Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly Key: SPARK-12219 URL: https://issues.apache.org/jira/browse/SPARK-12219 Project: Spark Issue Type: Bug Affects Versions: 1.5.2 Reporter: Rodrigo Boavida I've tried with no success to build Spark on Scala 2.11.7. I'm getting build errors using sbt due to the issues found in the below thread in July of this year. https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E Seems some minor fixes are needed to make the Scala 2.11 compiler happy. I needed to build with SBT as per suggested on below thread to get over some apparent maven shader plugin because which changed some classes when I change to akka 2.4.0. https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU I've set this bug to Major priority assuming that the Spark community wants to keep fully supporting SBT builds, including the Scala 2.11 compatibility. Tnks, Rod -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams
[ https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044846#comment-15044846 ] Rodrigo Boavida commented on SPARK-12178: - For any new implementation of a custom stream. For example, the KafkaDirectInputDStream is a custom stream which has its own compute method with its way of calculating the StreamInputInfo that feeds into the Spark Streaming context the ingestion rate and information differently than the ReceiverInputDStream. I'm currently implementing a similar DStream to the KafkaDirectStream which feeds on Akka to retrieve data from each worker thus the ingestion report needs to be custom made as well. If we don't have this reporting function exposed, the spark streaming page will not be able to show us the events/sec rate. I hope this helps understand the requirement. tnks, Rod > Expose reporting of StreamInputInfo for custom made streams > --- > > Key: SPARK-12178 > URL: https://issues.apache.org/jira/browse/SPARK-12178 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Rodrigo Boavida >Priority: Minor > > For custom made direct streams, the Spark Streaming context needs to be > informed of the RDD count per batch execution. This is not exposed by the > InputDStream abstract class. > The suggestion is to create a method in the InputDStream class that reports > to the streaming context and make that available to child classes of > InputDStream. > Signature example: > def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : > org.apache.spark.streaming.scheduler.StreamInputInfo) > I have already done this on my own private branch. I can merge that change in > if approval is given. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams
Rodrigo Boavida created SPARK-12178: --- Summary: Expose reporting of StreamInputInfo for custom made streams Key: SPARK-12178 URL: https://issues.apache.org/jira/browse/SPARK-12178 Project: Spark Issue Type: Improvement Components: Streaming Reporter: Rodrigo Boavida Priority: Minor For custom made direct streams, the Spark Streaming context needs to be informed of the RDD count per batch execution. This is not exposed by the InputDStream abstract class. The suggestion is to create a method in the InputDStream class that reports to the streaming context and make that available to child classes of InputDStream. Signature example: def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : org.apache.spark.streaming.scheduler.StreamInputInfo) I have already done this on my own private branch. I can merge that change in if approval is given. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams
[ https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044850#comment-15044850 ] Rodrigo Boavida commented on SPARK-12178: - This is probably the first step onto the effort of making the concept of Direct Streams generic and reusable for different technologies (not just Kafka). Reactive streams concept is an example of a further step. I'd like to tag here Prakash Chockalingam from Databricks with who I had this conversation about on the latest Spark Summit, but can't find his user name. CCing Iulian as well [~dragos] Tnks, Rod > Expose reporting of StreamInputInfo for custom made streams > --- > > Key: SPARK-12178 > URL: https://issues.apache.org/jira/browse/SPARK-12178 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Rodrigo Boavida >Priority: Minor > > For custom made direct streams, the Spark Streaming context needs to be > informed of the RDD count per batch execution. This is not exposed by the > InputDStream abstract class. > The suggestion is to create a method in the InputDStream class that reports > to the streaming context and make that available to child classes of > InputDStream. > Signature example: > def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : > org.apache.spark.streaming.scheduler.StreamInputInfo) > I have already done this on my own private branch. I can merge that change in > if approval is given. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10420) Implementing Reactive Streams based Spark Streaming Receiver
[ https://issues.apache.org/jira/browse/SPARK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14989521#comment-14989521 ] Rodrigo Boavida commented on SPARK-10420: - Hi all, I've touched base about this with Luc, Julian and Prakash in the Spark Summit EU. RS implementation is extremely important for a distributed system to keep up data consumption at its own pace without risking overwhelming available resources and failure conditions such as OOM. My company has extreme interest in this feature, given we are implementing a new streaming architecture. The current implementation of Spark Streaming has the Receiver as entry point for data which can easily become a constraint as corresponding failures or slow downs on corresponding JVM can impact the whole spark streaming application dramatically. When the data stream allows to partition the feed (like Kafka or Akka with Sharding) making the receiver concept a built in executor concept where each executor has it's own slice of the feed. leverages the parallel processing nature of Spark. This is already done with Kafka Direct Stream. I would be very interested in this feature, in particular without a receiver and with a direct stream approach (like the Kafka one) where each executor could subscribe directly, based on whatever is the type of streaming context. For example: for an Akka based RS stream, given an sharding function passed in the streaming context. This would also potentially address the currently known issue regarding Mesos and dynamic allocation, where Mesos could bring down the executor on which the Receiver is running thus stopping the whole stream - if there is no Receiver, the stream wouldn't have to stop and it's up to each streaming context to define how streaming slices would be redistributed. For example: with an Akka based RS streaming context I assume this could be easily achieved. I intend to create two tickets - one for generic RS abstraction layer on the executors and another for specific Akka RS based Streaming context. Will tag a few people on both, I will appreciate any comments given! Tnks, Rod > Implementing Reactive Streams based Spark Streaming Receiver > > > Key: SPARK-10420 > URL: https://issues.apache.org/jira/browse/SPARK-10420 > Project: Spark > Issue Type: Improvement > Components: Streaming >Reporter: Nilanjan Raychaudhuri >Priority: Minor > > Hello TD, > This is probably the last bit of the back-pressure story, implementing > ReactiveStreams based Spark streaming receivers. After discussing about this > with my Typesafe team we came up with the following design document > https://docs.google.com/document/d/1lGQKXfNznd5SPuQigvCdLsudl-gcvWKuHWr0Bpn3y30/edit?usp=sharing > Could you please take a look at this when you get a chance? > Thanks > Nilanjan -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org