[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally

2022-02-06 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-37536:

Description: 
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for microservices.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations - ShuffleExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plans unchanged if the value is 
false.

Looking forward any comments and feedback.

  was:
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for microservices.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plans unchanged if the value is 
false.

Looking forward any comments and feedback.


> Allow for API user to disable Shuffle Operations while running locally
> --
>
> Key: SPARK-37536
> URL: https://issues.apache.org/jira/browse/SPARK-37536
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
> Environment: Spark running in local mode
>Reporter: Rodrigo Boavida
>Priority: Major
>
> We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
> for microservices.
> Spark's powerful SQL features, and flexibility enables developers to build 
> efficient data querying solutions. Due to the nature of our solution dealing 
> with small datasets, which required to be queried through SQL, on very low 
> latencies, we found the embedded approach a very good model.
> We found through experimentation, that Spark on local mode, would gain 
> significant performance improvements (on average between 20-30%) by disabling 
> the shuffling on aggregation operations - ShuffleExchangeExec.
> I will be raising a PR, to propose introducing a new configuration variable 
> *spark.sql.localMode.shuffle.enabled*
> This variable will default to true, and will be checked on the QueryExecution 
> EnsureRequirements creation time, in conjunction with checking if Spark is 
> running on local mode, will keep the execution plans unchanged if the value 
> is false.
> Looking forward any comments and feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally

2022-02-05 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-37536:

Description: 
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for microservices.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plans unchanged if the value is 
false.

Looking forward any comments and feedback.

  was:
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for microservices.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.


> Allow for API user to disable Shuffle Operations while running locally
> --
>
> Key: SPARK-37536
> URL: https://issues.apache.org/jira/browse/SPARK-37536
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
> Environment: Spark running in local mode
>Reporter: Rodrigo Boavida
>Priority: Major
>
> We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
> for microservices.
> Spark's powerful SQL features, and flexibility enables developers to build 
> efficient data querying solutions. Due to the nature of our solution dealing 
> with small datasets, which required to be queried through SQL, on very low 
> latencies, we found the embedded approach a very good model.
> We found through experimentation, that Spark on local mode, would gain 
> significant performance improvements (on average between 20-30%) by disabling 
> the shuffling on aggregation operations. This is done by expanding the query 
> execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.
> I will be raising a PR, to propose introducing a new configuration variable 
> *spark.sql.localMode.shuffle.enabled*
> This variable will default to true, and will be checked on the QueryExecution 
> EnsureRequirements creation time, in conjunction with checking if Spark is 
> running on local mode, will keep the execution plans unchanged if the value 
> is false.
> Looking forward any comments and feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally

2022-02-05 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-37536:

Description: 
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for microservices.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.

  was:
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for our microservice.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.


> Allow for API user to disable Shuffle Operations while running locally
> --
>
> Key: SPARK-37536
> URL: https://issues.apache.org/jira/browse/SPARK-37536
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
> Environment: Spark running in local mode
>Reporter: Rodrigo Boavida
>Priority: Major
>
> We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
> for microservices.
> Spark's powerful SQL features, and flexibility enables developers to build 
> efficient data querying solutions. Due to the nature of our solution dealing 
> with small datasets, which required to be queried through SQL, on very low 
> latencies, we found the embedded approach a very good model.
> We found through experimentation, that Spark on local mode, would gain 
> significant performance improvements (on average between 20-30%) by disabling 
> the shuffling on aggregation operations. This is done by expanding the query 
> execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.
> I will be raising a PR, to propose introducing a new configuration variable 
> *spark.sql.localMode.shuffle.enabled*
> This variable will default to true, and will be checked on the QueryExecution 
> EnsureRequirements creation time, in conjunction with checking if Spark is 
> running on local mode, will keep the execution plan unchanged if the value is 
> false.
> Looking forward any comments and feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally

2022-02-05 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-37536:

Description: 
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for our microservice.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.sql.localMode.shuffle.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.

  was:
We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for our microservice.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.shuffle.local.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.


> Allow for API user to disable Shuffle Operations while running locally
> --
>
> Key: SPARK-37536
> URL: https://issues.apache.org/jira/browse/SPARK-37536
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.3.0
> Environment: Spark running in local mode
>Reporter: Rodrigo Boavida
>Priority: Major
>
> We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
> for our microservice.
> Spark's powerful SQL features, and flexibility enables developers to build 
> efficient data querying solutions. Due to the nature of our solution dealing 
> with small datasets, which required to be queried through SQL, on very low 
> latencies, we found the embedded approach a very good model.
> We found through experimentation, that Spark on local mode, would gain 
> significant performance improvements (on average between 20-30%) by disabling 
> the shuffling on aggregation operations. This is done by expanding the query 
> execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.
> I will be raising a PR, to propose introducing a new configuration variable 
> *spark.sql.localMode.shuffle.enabled*
> This variable will default to true, and will be checked on the QueryExecution 
> EnsureRequirements creation time, in conjunction with checking if Spark is 
> running on local mode, will keep the execution plan unchanged if the value is 
> false.
> Looking forward any comments and feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36986) Improving schema filtering flexibility

2022-02-05 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Description: 
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found an optimization would improve greatly 
Spark's flexibility to query external schema management.

Scope: ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
*
*/
def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   val 
field = nameToField.get(name)   if(field.isDefined) {     
Some((fieldIndex(name), field.get))   }
else \{     None   }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposal.

 

  was:
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

Scope: ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
*
*/
def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   val 
field = nameToField.get(name)   if(field.isDefined) \{     
Some((fieldIndex(name), field.get))   }
else
{     None   }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposal.

 


> Improving schema filtering flexibility
> --
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Rodrigo Boavida
>Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found an optimization would improve greatly 
> Spark's flexibility to query external schema management.
> Scope: ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
>  * Avoids two client calls/loops to obtain consolidated field info.
> *
> */
> def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   
> val field = nameToField.get(name)   if(field.isDefined) {     
> Some((fieldIndex(name), field.get))   }
> else \{     None   }
> }
> This is particularly useful from an efficiency perspective, when we're 
> parsing a Json structure and we want to check for every field what is the 
> name and field type already defined in the schema
> I will create a corresponding branch for PR review, assuming that there are 
> no concerns with the above proposal.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36986) Improving schema filtering flexibility

2022-02-05 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Summary: Improving schema filtering flexibility  (was: Improving external 
schema management flexibility)

> Improving schema filtering flexibility
> --
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Rodrigo Boavida
>Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve 
> greatly Spark's flexibility to handle external schema management.
> Scope: ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
>  * Avoids two client calls/loops to obtain consolidated field info.
> *
> */
> def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   
> val field = nameToField.get(name)   if(field.isDefined) \{     
> Some((fieldIndex(name), field.get))   }
> else
> {     None   }
> }
> This is particularly useful from an efficiency perspective, when we're 
> parsing a Json structure and we want to check for every field what is the 
> name and field type already defined in the schema
> I will create a corresponding branch for PR review, assuming that there are 
> no concerns with the above proposal.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36986) Improving external schema management flexibility

2022-02-04 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Docs Text: 
Schema management improvements 
1 - Retrieving a field name and type from a schema based on its index

  was:
Schema management improvements 
1 - Retrieving a field name and type from a schema based on its index
2 - Allowing external dataSet schemas to be provided as well their external 
generated rows.


> Improving external schema management flexibility
> 
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Rodrigo Boavida
>Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve 
> greatly Spark's flexibility to handle external schema management.
> Scope: ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
>  * Avoids two client calls/loops to obtain consolidated field info.
> *
> */
> def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   
> val field = nameToField.get(name)   if(field.isDefined) \{     
> Some((fieldIndex(name), field.get))   }
> else
> {     None   }
> }
> This is particularly useful from an efficiency perspective, when we're 
> parsing a Json structure and we want to check for every field what is the 
> name and field type already defined in the schema
> I will create a corresponding branch for PR review, assuming that there are 
> no concerns with the above proposal.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36986) Improving external schema management flexibility

2022-02-04 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Description: 
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

Scope: ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
*
*/
def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = \{   val 
field = nameToField.get(name)   if(field.isDefined) \{     
Some((fieldIndex(name), field.get))   }
else
{     None   }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposal.

 

  was:
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

1 - ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
 *
 */
 def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
   val field = nameToField.get(name)
   if(field.isDefined) \{     Some((fieldIndex(name), field.get))   }
else
{     None   }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

 

2 - Allowing for a dataset to be created from a schema, and passing the 
corresponding internal rows which the internal types map with the schema 
already defined externally. This allows to create Spark fields based on any 
data structure, without depending on Spark's internal conversions (in 
particular for Json parsing), and improves performance by skipping the 
CatalystConverts job of converting native Java types into Spark types.

This is what the function would look like:

 

/**
 * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
method allows
 * the caller to create externally the InternalRow set, as we as define the 
schema externally.
 *
 * @since 3.3.0
 */
 def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = \{  
 val attributes = schema.toAttributes   val plan = LogicalRDD(attributes, 
data)(self)   val qe = sessionState.executePlan(plan)   qe.assertAnalyzed()   
new Dataset[Row](this, plan, RowEncoder(schema)) }

 

This is similar to this function:

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

But doesn't depend on Spark internally creating the RDD based by inferring for 
example from a Json structure. Which is not useful if we're managing the schema 
externally.

Also skips the Catalyst conversions and corresponding object overhead, making 
the internal rows generation much more efficient, by being done explicitly from 
the caller.

 

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposals.

 


> Improving external schema management flexibility
> 
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Rodrigo Boavida
>Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve 
> greatly Spark's flexibility to handle external schema management.
> Scope: ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
> 

[jira] [Updated] (SPARK-36986) Improving external schema management flexibility

2022-02-04 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Priority: Major  (was: Minor)

> Improving external schema management flexibility
> 
>
> Key: SPARK-36986
> URL: https://issues.apache.org/jira/browse/SPARK-36986
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Rodrigo Boavida
>Priority: Major
>
> Our spark usage, requires us to build an external schema and pass it on while 
> creating a DataSet.
> While working through this, I found a couple of optimizations would improve 
> greatly Spark's flexibility to handle external schema management.
> 1 - ability to retrieve a field's name and schema in one single call, 
> requesting to return a tupple by index. 
> Means extending the StructType class to support an additional method
> This is what the function would look like:
> /**
>  * Returns the index and field structure by name.
>  * If it doesn't find it, returns None.
>  * Avoids two client calls/loops to obtain consolidated field info.
>  *
>  */
>  def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
>    val field = nameToField.get(name)
>    if(field.isDefined) \{     Some((fieldIndex(name), field.get))   }
> else
> {     None   }
> }
> This is particularly useful from an efficiency perspective, when we're 
> parsing a Json structure and we want to check for every field what is the 
> name and field type already defined in the schema
>  
> 2 - Allowing for a dataset to be created from a schema, and passing the 
> corresponding internal rows which the internal types map with the schema 
> already defined externally. This allows to create Spark fields based on any 
> data structure, without depending on Spark's internal conversions (in 
> particular for Json parsing), and improves performance by skipping the 
> CatalystConverts job of converting native Java types into Spark types.
> This is what the function would look like:
>  
> /**
>  * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
> method allows
>  * the caller to create externally the InternalRow set, as we as define the 
> schema externally.
>  *
>  * @since 3.3.0
>  */
>  def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = 
> \{   val attributes = schema.toAttributes   val plan = LogicalRDD(attributes, 
> data)(self)   val qe = sessionState.executePlan(plan)   qe.assertAnalyzed()   
> new Dataset[Row](this, plan, RowEncoder(schema)) }
>  
> This is similar to this function:
> def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame
> But doesn't depend on Spark internally creating the RDD based by inferring 
> for example from a Json structure. Which is not useful if we're managing the 
> schema externally.
> Also skips the Catalyst conversions and corresponding object overhead, making 
> the internal rows generation much more efficient, by being done explicitly 
> from the caller.
>  
> I will create a corresponding branch for PR review, assuming that there are 
> no concerns with the above proposals.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37536) Allow for API user to disable Shuffle Operations while running locally

2021-12-03 Thread Rodrigo Boavida (Jira)
Rodrigo Boavida created SPARK-37536:
---

 Summary: Allow for API user to disable Shuffle Operations while 
running locally
 Key: SPARK-37536
 URL: https://issues.apache.org/jira/browse/SPARK-37536
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.3.0
 Environment: Spark running in local mode
Reporter: Rodrigo Boavida


We have been using Spark on local mode, as a small embedded, in-memory SQL DB 
for our microservice.

Spark's powerful SQL features, and flexibility enables developers to build 
efficient data querying solutions. Due to the nature of our solution dealing 
with small datasets, which required to be queried through SQL, on very low 
latencies, we found the embedded approach a very good model.

We found through experimentation, that Spark on local mode, would gain 
significant performance improvements (on average between 20-30%) by disabling 
the shuffling on aggregation operations. This is done by expanding the query 
execution plan with ShuffleExchangeExec or the BroadcastExchangeExec.

I will be raising a PR, to propose introducing a new configuration variable 

*spark.shuffle.local.enabled*

This variable will default to true, and will be checked on the QueryExecution 
EnsureRequirements creation time, in conjunction with checking if Spark is 
running on local mode, will keep the execution plan unchanged if the value is 
false.

Looking forward any comments and feedback.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36986) Improving external schema management flexibility

2021-10-21 Thread Rodrigo Boavida (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rodrigo Boavida updated SPARK-36986:

Description: 
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

1 - ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
 *
 */
 def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
   val field = nameToField.get(name)
   if(field.isDefined) \{     Some((fieldIndex(name), field.get))   }
else
{     None   }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

 

2 - Allowing for a dataset to be created from a schema, and passing the 
corresponding internal rows which the internal types map with the schema 
already defined externally. This allows to create Spark fields based on any 
data structure, without depending on Spark's internal conversions (in 
particular for Json parsing), and improves performance by skipping the 
CatalystConverts job of converting native Java types into Spark types.

This is what the function would look like:

 

/**
 * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
method allows
 * the caller to create externally the InternalRow set, as we as define the 
schema externally.
 *
 * @since 3.3.0
 */
 def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = \{  
 val attributes = schema.toAttributes   val plan = LogicalRDD(attributes, 
data)(self)   val qe = sessionState.executePlan(plan)   qe.assertAnalyzed()   
new Dataset[Row](this, plan, RowEncoder(schema)) }

 

This is similar to this function:

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

But doesn't depend on Spark internally creating the RDD based by inferring for 
example from a Json structure. Which is not useful if we're managing the schema 
externally.

Also skips the Catalyst conversions and corresponding object overhead, making 
the internal rows generation much more efficient, by being done explicitly from 
the caller.

 

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposals.

 

  was:
Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

1 - ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
 *
 */
def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
  val field = nameToField.get(name)
  if(field.isDefined) {
    Some((fieldIndex(name), field.get))
  } else {
    None
  }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

 

2 - Allowing for a dataset to be created from a schema, and passing the 
corresponding internal rows which the internal types map with the schema 
already defined externally. this greatly improves the performance by skipping

This is what the function would look like:

 

/**
 * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
method allows
 * the caller to create externally the InternalRow set, as we as define the 
schema externally.
 *
 * @since 3.3.0
 */
def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = {
  val attributes = schema.toAttributes
  val plan = LogicalRDD(attributes, data)(self)
  val qe = sessionState.executePlan(plan)
  qe.assertAnalyzed()
  new Dataset[Row](this, plan, RowEncoder(schema))
}

 

This is similar to this function:

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

But doesn't depend on Spark internally creating the RDD based by inferring for 
example from a Json structure. Which is not useful if we're managing the schema 
externally.

Also skips the Catalyst conversions and 

[jira] [Created] (SPARK-36986) Improving external schema management flexibility

2021-10-12 Thread Rodrigo Boavida (Jira)
Rodrigo Boavida created SPARK-36986:
---

 Summary: Improving external schema management flexibility
 Key: SPARK-36986
 URL: https://issues.apache.org/jira/browse/SPARK-36986
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.3.0
Reporter: Rodrigo Boavida


Our spark usage, requires us to build an external schema and pass it on while 
creating a DataSet.

While working through this, I found a couple of optimizations would improve 
greatly Spark's flexibility to handle external schema management.

1 - ability to retrieve a field's name and schema in one single call, 
requesting to return a tupple by index. 

Means extending the StructType class to support an additional method

This is what the function would look like:

/**
 * Returns the index and field structure by name.
 * If it doesn't find it, returns None.
 * Avoids two client calls/loops to obtain consolidated field info.
 *
 */
def getIndexAndFieldByName(name: String): Option[(Int, StructField)] = {
  val field = nameToField.get(name)
  if(field.isDefined) {
    Some((fieldIndex(name), field.get))
  } else {
    None
  }
}

This is particularly useful from an efficiency perspective, when we're parsing 
a Json structure and we want to check for every field what is the name and 
field type already defined in the schema

 

2 - Allowing for a dataset to be created from a schema, and passing the 
corresponding internal rows which the internal types map with the schema 
already defined externally. this greatly improves the performance by skipping

This is what the function would look like:

 

/**
 * Creates a [[Dataset]] from an RDD of spark.sql.catalyst.InternalRow. This 
method allows
 * the caller to create externally the InternalRow set, as we as define the 
schema externally.
 *
 * @since 3.3.0
 */
def createDataset(data: RDD[InternalRow], schema: StructType): DataFrame = {
  val attributes = schema.toAttributes
  val plan = LogicalRDD(attributes, data)(self)
  val qe = sessionState.executePlan(plan)
  qe.assertAnalyzed()
  new Dataset[Row](this, plan, RowEncoder(schema))
}

 

This is similar to this function:

def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame

But doesn't depend on Spark internally creating the RDD based by inferring for 
example from a Json structure. Which is not useful if we're managing the schema 
externally.

Also skips the Catalyst conversions and corresponding object overhead, making 
the internal rows generation much more efficient, by being done explicitly from 
the caller.

 

I will create a corresponding branch for PR review, assuming that there are no 
concerns with the above proposals.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2629) Improved state management for Spark Streaming

2016-01-26 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117404#comment-15117404
 ] 

Rodrigo Boavida commented on SPARK-2629:


Hi,

I've experimented the new API method but am struggling to find an option to 
have the updateStateByKey behavior which forces all elements to be recomputed 
on every batch (this is a requirement for my application as I am calculating 
duration fields based on the streaming intervals and updating external store 
for every element).

Seems this function does not allow to compute all elements as an option. Seems 
by design judging by the design doc.

Could someone please clarify?

tnks,
Rod

> Improved state management for Spark Streaming
> -
>
> Key: SPARK-2629
> URL: https://issues.apache.org/jira/browse/SPARK-2629
> Project: Spark
>  Issue Type: Epic
>  Components: Streaming
>Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>
>  Current updateStateByKey provides stateful processing in Spark Streaming. It 
> allows the user to maintain per-key state and manage that state using an 
> updateFunction. The updateFunction is called for each key, and it uses new 
> data and existing state of the key, to generate an updated state. However, 
> based on community feedback, we have learnt the following lessons.
> - Need for more optimized state management that does not scan every key
> - Need to make it easier to implement common use cases - (a) timeout of idle 
> data, (b) returning items other than state
> The high level idea that I am proposing is 
> - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user 
> to update per-key state, and emit arbitrary records. The new API is necessary 
> as this will have significantly different semantics than the existing 
> updateStateByKey API. This API will have direct support for timeouts.
> - Internally, the system will keep the state data as a map/list within the 
> partitions of the state RDDs. The new data RDDs will be partitioned 
> appropriately, and for all the key-value data, it will lookup the map/list in 
> the state RDD partition and create a new list/map of updated state data. The 
> new state RDD partition will be created based on the update data and if 
> necessary, with old data. 
> Here is the detailed design doc (*outdated, to be updated*). Please take a 
> look and provide feedback as comments.
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-15 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15058083#comment-15058083
 ] 

Rodrigo Boavida commented on SPARK-12219:
-

Just got everything running perfectly on the cluster on latest 1.6 SNAPHOST.
I will start using 1.6 and above. Thanks for the help checking it.

Tnks,
Rod



> Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
> -
>
> Key: SPARK-12219
> URL: https://issues.apache.org/jira/browse/SPARK-12219
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.5.2
>Reporter: Rodrigo Boavida
>
> I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
> errors using sbt due to the issues found in the below thread in July of this 
> year.
> https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E
> Seems some minor fixes are needed to make the Scala 2.11 compiler happy.
> I needed to build with SBT as per suggested on below thread to get over some 
> apparent maven shader plugin because which changed some classes when I change 
> to akka 2.4.0.
> https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU
> I've set this bug to Major priority assuming that the Spark community wants 
> to keep fully supporting SBT builds, including the Scala 2.11 compatibility.
> Tnks,
> Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams

2015-12-15 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15058121#comment-15058121
 ] 

Rodrigo Boavida commented on SPARK-12178:
-

I plan onto to make my akka direct stream implementation open sourced - but 
this would be absolutely necessary to have it complete. 
I heard there is someone working on a flume based implementation of direct 
stream and I'm sure other streaming engines will follow soon.
Is there something I could do to push this forward? I don't mind being the one 
doing the change.

Tnks,
Rod



> Expose reporting of StreamInputInfo for custom made streams
> ---
>
> Key: SPARK-12178
> URL: https://issues.apache.org/jira/browse/SPARK-12178
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Rodrigo Boavida
>Priority: Minor
>
> For custom made direct streams, the Spark Streaming context needs to be 
> informed of the RDD count per batch execution. This is not exposed by the 
> InputDStream abstract class. 
> The suggestion is to create a method in the InputDStream class that reports 
> to the streaming context and make that available to child classes of 
> InputDStream.
> Signature example:
> def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : 
> org.apache.spark.streaming.scheduler.StreamInputInfo)
> I have already done this on my own private branch. I can merge that change in 
> if approval is given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-14 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15057133#comment-15057133
 ] 

Rodrigo Boavida commented on SPARK-12219:
-

[~srowen] Just built and ran successfully off the cluster the 1.6 branch. Going 
to run on cluster tomorrow just to double check runtime is in good conditions 
as well, and will let you know. 

Tnks,
Rod

> Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
> -
>
> Key: SPARK-12219
> URL: https://issues.apache.org/jira/browse/SPARK-12219
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.5.2
>Reporter: Rodrigo Boavida
>
> I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
> errors using sbt due to the issues found in the below thread in July of this 
> year.
> https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E
> Seems some minor fixes are needed to make the Scala 2.11 compiler happy.
> I needed to build with SBT as per suggested on below thread to get over some 
> apparent maven shader plugin because which changed some classes when I change 
> to akka 2.4.0.
> https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU
> I've set this bug to Major priority assuming that the Spark community wants 
> to keep fully supporting SBT builds, including the Scala 2.11 compatibility.
> Tnks,
> Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-14 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15056180#comment-15056180
 ] 

Rodrigo Boavida commented on SPARK-12219:
-

Sean - that's great news. Unfortunately I haven't had time to check on the 
errors. I will get a latest copy of 1.6, build it again and let you know.

Cheers,
Rod



> Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
> -
>
> Key: SPARK-12219
> URL: https://issues.apache.org/jira/browse/SPARK-12219
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.5.2
>Reporter: Rodrigo Boavida
>
> I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
> errors using sbt due to the issues found in the below thread in July of this 
> year.
> https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E
> Seems some minor fixes are needed to make the Scala 2.11 compiler happy.
> I needed to build with SBT as per suggested on below thread to get over some 
> apparent maven shader plugin because which changed some classes when I change 
> to akka 2.4.0.
> https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU
> I've set this bug to Major priority assuming that the Spark community wants 
> to keep fully supporting SBT builds, including the Scala 2.11 compatibility.
> Tnks,
> Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-09 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049168#comment-15049168
 ] 

Rodrigo Boavida commented on SPARK-12219:
-

I could look into that tomorrow morning (my day is ending over in Ireland :)). 
Could you give me some hints on how to proceed? Would be my first contribution 
to Spark.

Tnks,
Rod



> Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
> -
>
> Key: SPARK-12219
> URL: https://issues.apache.org/jira/browse/SPARK-12219
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.5.2
>Reporter: Rodrigo Boavida
>
> I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
> errors using sbt due to the issues found in the below thread in July of this 
> year.
> https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E
> Seems some minor fixes are needed to make the Scala 2.11 compiler happy.
> I needed to build with SBT as per suggested on below thread to get over some 
> apparent maven shader plugin because which changed some classes when I change 
> to akka 2.4.0.
> https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU
> I've set this bug to Major priority assuming that the Spark community wants 
> to keep fully supporting SBT builds, including the Scala 2.11 compatibility.
> Tnks,
> Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-09 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15049044#comment-15049044
 ] 

Rodrigo Boavida commented on SPARK-12219:
-

Hi Sean,

I've proceed as instructed per documentation. Applied the following script
2.11.7
Also changed the Scala version in the main pom.xml to 2.11.7 which it is not 
done by the script
2.11.7

The errors I get when I run: "build/sbt -Pyarn -Phadoop-2.3 -Dscala-2.11 
assembly"

[error] 
/home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala:40:
 no valid targets for annotation on value rdd - it is discarded unused. You may 
specify targets with meta-annotations, e.g. @(transient @param)
[error] @transient rdd: RDD[T],
[error]
[error] 
/home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala:42:
 no valid targets for annotation on value parentRddPartitionIndex - it is 
discarded unused. You may specify targets with meta-annotations, e.g. 
@(transient @param)
[error] @transient parentRddPartitionIndex: Int)
[error]
[error] 
/home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:35:
 no valid targets for annotation on value partitionFilterFunc - it is discarded 
unused. You may specify targets with meta-annotations, e.g. @(transient @param)
[error] private[spark] class PruneDependency[T](rdd: RDD[T], @transient 
partitionFilterFunc: Int => Boolean)
[error]
[error] 
/home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:58:
 no valid targets for annotation on value prev - it is discarded unused. You 
may specify targets with meta-annotations, e.g. @(transient @param)
[error] @transient prev: RDD[T],
[error]
[error] 
/home/spark/sbt_spark-1.5.2/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala:59:
 no valid targets for annotation on value partitionFilterFunc - it is discarded 
unused. You may specify targets with meta-annotations, e.g. @(transient @param)
[error] @transient partitionFilterFunc: Int => Boolean)


> Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly
> -
>
> Key: SPARK-12219
> URL: https://issues.apache.org/jira/browse/SPARK-12219
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 1.5.2
>Reporter: Rodrigo Boavida
>
> I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
> errors using sbt due to the issues found in the below thread in July of this 
> year.
> https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E
> Seems some minor fixes are needed to make the Scala 2.11 compiler happy.
> I needed to build with SBT as per suggested on below thread to get over some 
> apparent maven shader plugin because which changed some classes when I change 
> to akka 2.4.0.
> https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU
> I've set this bug to Major priority assuming that the Spark community wants 
> to keep fully supporting SBT builds, including the Scala 2.11 compatibility.
> Tnks,
> Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12219) Spark 1.5.2 code does not build on Scala 2.11.7 with SBT assembly

2015-12-08 Thread Rodrigo Boavida (JIRA)
Rodrigo Boavida created SPARK-12219:
---

 Summary: Spark 1.5.2 code does not build on Scala 2.11.7 with SBT 
assembly
 Key: SPARK-12219
 URL: https://issues.apache.org/jira/browse/SPARK-12219
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.5.2
Reporter: Rodrigo Boavida


I've tried with no success to build Spark on Scala 2.11.7. I'm getting build 
errors using sbt due to the issues found in the below thread in July of this 
year.
https://mail-archives.apache.org/mod_mbox/spark-dev/201507.mbox/%3CCA+3qhFSJGmZToGmBU1=ivy7kr6eb7k8t6dpz+ibkstihryw...@mail.gmail.com%3E

Seems some minor fixes are needed to make the Scala 2.11 compiler happy.

I needed to build with SBT as per suggested on below thread to get over some 
apparent maven shader plugin because which changed some classes when I change 
to akka 2.4.0.
https://groups.google.com/forum/#!topic/akka-user/iai6whR6-xU

I've set this bug to Major priority assuming that the Spark community wants to 
keep fully supporting SBT builds, including the Scala 2.11 compatibility.

Tnks,
Rod



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams

2015-12-07 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044846#comment-15044846
 ] 

Rodrigo Boavida commented on SPARK-12178:
-

For any new implementation of a custom stream. 
For example, the KafkaDirectInputDStream is a custom stream which has its own 
compute method with its way of calculating the StreamInputInfo that feeds into 
the Spark Streaming context the ingestion rate and information differently than 
the ReceiverInputDStream.

I'm currently implementing a similar DStream to the KafkaDirectStream which 
feeds on Akka to retrieve data from each worker thus the ingestion report needs 
to be custom made as well. 

If we don't have this reporting function exposed, the spark streaming page will 
not be able to show us the events/sec rate.

I hope this helps understand the requirement.

tnks,
Rod

> Expose reporting of StreamInputInfo for custom made streams
> ---
>
> Key: SPARK-12178
> URL: https://issues.apache.org/jira/browse/SPARK-12178
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Rodrigo Boavida
>Priority: Minor
>
> For custom made direct streams, the Spark Streaming context needs to be 
> informed of the RDD count per batch execution. This is not exposed by the 
> InputDStream abstract class. 
> The suggestion is to create a method in the InputDStream class that reports 
> to the streaming context and make that available to child classes of 
> InputDStream.
> Signature example:
> def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : 
> org.apache.spark.streaming.scheduler.StreamInputInfo)
> I have already done this on my own private branch. I can merge that change in 
> if approval is given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams

2015-12-07 Thread Rodrigo Boavida (JIRA)
Rodrigo Boavida created SPARK-12178:
---

 Summary: Expose reporting of StreamInputInfo for custom made 
streams
 Key: SPARK-12178
 URL: https://issues.apache.org/jira/browse/SPARK-12178
 Project: Spark
  Issue Type: Improvement
  Components: Streaming
Reporter: Rodrigo Boavida
Priority: Minor


For custom made direct streams, the Spark Streaming context needs to be 
informed of the RDD count per batch execution. This is not exposed by the 
InputDStream abstract class. 
The suggestion is to create a method in the InputDStream class that reports to 
the streaming context and make that available to child classes of InputDStream.
Signature example:
def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : 
org.apache.spark.streaming.scheduler.StreamInputInfo)

I have already done this on my own private branch. I can merge that change in 
if approval is given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12178) Expose reporting of StreamInputInfo for custom made streams

2015-12-07 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044850#comment-15044850
 ] 

Rodrigo Boavida commented on SPARK-12178:
-

This is probably the first step onto the effort of making the concept of Direct 
Streams generic and reusable for different technologies (not just Kafka). 
Reactive streams concept is an example of a further step.

I'd like to tag here Prakash Chockalingam from Databricks with who I had this 
conversation about on the latest Spark Summit, but can't find his user name.
CCing Iulian as well
[~dragos]

Tnks,
Rod

> Expose reporting of StreamInputInfo for custom made streams
> ---
>
> Key: SPARK-12178
> URL: https://issues.apache.org/jira/browse/SPARK-12178
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Rodrigo Boavida
>Priority: Minor
>
> For custom made direct streams, the Spark Streaming context needs to be 
> informed of the RDD count per batch execution. This is not exposed by the 
> InputDStream abstract class. 
> The suggestion is to create a method in the InputDStream class that reports 
> to the streaming context and make that available to child classes of 
> InputDStream.
> Signature example:
> def reportInfo(validTime : org.apache.spark.streaming.Time, inputInfo : 
> org.apache.spark.streaming.scheduler.StreamInputInfo)
> I have already done this on my own private branch. I can merge that change in 
> if approval is given.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10420) Implementing Reactive Streams based Spark Streaming Receiver

2015-11-04 Thread Rodrigo Boavida (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14989521#comment-14989521
 ] 

Rodrigo Boavida commented on SPARK-10420:
-

Hi all,
I've touched base about this with Luc, Julian and Prakash in the Spark Summit 
EU.

RS implementation is extremely important for a distributed system to keep up 
data consumption at its own pace without risking overwhelming available 
resources and failure conditions such as OOM. My company has extreme interest 
in this feature, given we are implementing a new streaming architecture. 

The current implementation of Spark Streaming has the Receiver as entry point 
for data which can easily become a constraint as corresponding failures or slow 
downs on corresponding JVM can impact the whole spark streaming application 
dramatically. When the data stream allows to partition the feed (like Kafka or 
Akka with Sharding) making the receiver concept a built in executor concept 
where each executor has it's own slice of the feed. leverages the parallel 
processing nature of Spark. This is already done with Kafka Direct Stream.

I would be very interested in this feature, in particular without a receiver 
and with a direct stream approach (like the Kafka one) where each executor 
could subscribe directly, based on whatever is the type of streaming context. 
For example: for an Akka based RS stream, given an sharding function passed in 
the streaming context. 

This would also potentially address the currently known issue regarding Mesos 
and dynamic allocation, where Mesos could bring down the executor on which the 
Receiver is running thus stopping the whole stream - if there is no Receiver, 
the stream wouldn't have to stop and it's up to each streaming context to 
define how streaming slices would be redistributed. For example: with an Akka 
based RS streaming context I assume this could be easily achieved.

I intend to create two tickets - one for generic RS abstraction layer on the 
executors and another for specific Akka RS based Streaming context. Will tag a 
few people on both,

I will appreciate any comments given!

Tnks,
Rod

> Implementing Reactive Streams based Spark Streaming Receiver
> 
>
> Key: SPARK-10420
> URL: https://issues.apache.org/jira/browse/SPARK-10420
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Nilanjan Raychaudhuri
>Priority: Minor
>
> Hello TD,
> This is probably the last bit of the back-pressure story, implementing 
> ReactiveStreams based Spark streaming receivers. After discussing about this 
> with my Typesafe team we came up with the following design document
> https://docs.google.com/document/d/1lGQKXfNznd5SPuQigvCdLsudl-gcvWKuHWr0Bpn3y30/edit?usp=sharing
> Could you please take a look at this when you get a chance?
> Thanks
> Nilanjan



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org