[jira] [Commented] (SPARK-20883) Improve StateStore APIs for efficiency

2018-03-14 Thread jincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399930#comment-16399930
 ] 

jincheng commented on SPARK-20883:
--

it is convenient to cherry pick to 2.2 without any changes.

> Improve StateStore APIs for efficiency
> --
>
> Key: SPARK-20883
> URL: https://issues.apache.org/jira/browse/SPARK-20883
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.3.0
>
>
> Current state store API has a bunch of problems that causes too many 
> transient objects causing memory pressure.
> - StateStore.get() returns Options which forces creation of Some/None objects 
> for every get
> - StateStore.iterator() returns tuples which forces creation of new tuple for 
> each record returned
> - StateStore.updates() requires the implementation to keep track of updates, 
> while this is used minimally (only by Append mode in streaming aggregations). 
> This can be totally removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398237#comment-16398237
 ] 

Deepansh edited comment on SPARK-23650 at 3/15/18 4:39 AM:
---

attached more logs.


was (Author: litup):
attaching more of logs.

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23691) Use sql_conf util in PySpark tests where possible

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399899#comment-16399899
 ] 

Apache Spark commented on SPARK-23691:
--

User 'HyukjinKwon' has created a pull request for this issue:
https://github.com/apache/spark/pull/20830

> Use sql_conf util in PySpark tests where possible
> -
>
> Key: SPARK-23691
> URL: https://issues.apache.org/jira/browse/SPARK-23691
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> https://github.com/apache/spark/commit/d6632d185e147fcbe6724545488ad80dce20277e
>  added an useful util
> {code}
> @contextmanager
> def sql_conf(self, pairs):
> ...
> {code}
> to allow configuration set/unset within a block:
> {code}
> with self.sql_conf({"spark.blah.blah.blah", "blah"})
> # test codes
> {code}
> It would be nicer if we use it. 
> Note that there look already few places affecting tests without restoring the 
> original value back in unittest classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23691) Use sql_conf util in PySpark tests where possible

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23691:


Assignee: (was: Apache Spark)

> Use sql_conf util in PySpark tests where possible
> -
>
> Key: SPARK-23691
> URL: https://issues.apache.org/jira/browse/SPARK-23691
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Priority: Minor
>
> https://github.com/apache/spark/commit/d6632d185e147fcbe6724545488ad80dce20277e
>  added an useful util
> {code}
> @contextmanager
> def sql_conf(self, pairs):
> ...
> {code}
> to allow configuration set/unset within a block:
> {code}
> with self.sql_conf({"spark.blah.blah.blah", "blah"})
> # test codes
> {code}
> It would be nicer if we use it. 
> Note that there look already few places affecting tests without restoring the 
> original value back in unittest classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23691) Use sql_conf util in PySpark tests where possible

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23691:


Assignee: Apache Spark

> Use sql_conf util in PySpark tests where possible
> -
>
> Key: SPARK-23691
> URL: https://issues.apache.org/jira/browse/SPARK-23691
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>Priority: Minor
>
> https://github.com/apache/spark/commit/d6632d185e147fcbe6724545488ad80dce20277e
>  added an useful util
> {code}
> @contextmanager
> def sql_conf(self, pairs):
> ...
> {code}
> to allow configuration set/unset within a block:
> {code}
> with self.sql_conf({"spark.blah.blah.blah", "blah"})
> # test codes
> {code}
> It would be nicer if we use it. 
> Note that there look already few places affecting tests without restoring the 
> original value back in unittest classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23691) Use sql_conf util in PySpark tests where possible

2018-03-14 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-23691:


 Summary: Use sql_conf util in PySpark tests where possible
 Key: SPARK-23691
 URL: https://issues.apache.org/jira/browse/SPARK-23691
 Project: Spark
  Issue Type: Improvement
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Hyukjin Kwon


https://github.com/apache/spark/commit/d6632d185e147fcbe6724545488ad80dce20277e 
added an useful util

{code}
@contextmanager
def sql_conf(self, pairs):
...
{code}

to allow configuration set/unset within a block:

{code}
with self.sql_conf({"spark.blah.blah.blah", "blah"})
# test codes
{code}

It would be nicer if we use it. 

Note that there look already few places affecting tests without restoring the 
original value back in unittest classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23642) isZero scaladoc for LongAccumulator describes wrong method

2018-03-14 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-23642.
--
   Resolution: Fixed
 Assignee: Sean
Fix Version/s: 2.4.0
   2.3.1

Fixed in https://github.com/apache/spark/pull/20790

> isZero scaladoc for LongAccumulator describes wrong method
> --
>
> Key: SPARK-23642
> URL: https://issues.apache.org/jira/browse/SPARK-23642
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation
>Affects Versions: 2.3.0
>Reporter: Sean
>Assignee: Sean
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.3.1, 2.4.0
>
>
> Method isZero is described in the scaladoc as "Adds v to the accumulator, 
> i.e. increment sum by v and count by 1." when in fact it checks to see if it 
> is zero.
> Pull request [https://github.com/apache/spark/pull/20790] fixes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23684) mode append function not working

2018-03-14 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399817#comment-16399817
 ] 

Hyukjin Kwon commented on SPARK-23684:
--

I think it's a duplicate of SPARK-22437. Can you try it out in 2.3.0?

> mode append function not working 
> -
>
> Key: SPARK-23684
> URL: https://issues.apache.org/jira/browse/SPARK-23684
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.2.0
>Reporter: Evan Zamir
>Priority: Minor
>
> {{df.write.mode('append').jdbc(url, table, properties=\{"driver": 
> "org.postgresql.Driver"}) }}
> produces the following error and does not write to existing table:
> {{2018-03-14 11:00:08,332 root ERROR An error occurred while calling 
> o894.jdbc.}}
> {{: scala.MatchError: null}}
> {{ at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)}}
> {{ at 
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)}}
> {{ at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)}}
> {{ at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)}}
> {{ at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)}}
> {{ at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)}}
> {{ at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)}}
> {{ at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)}}
> {{ at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)}}
> {{ at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)}}
> {{ at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)}}
> {{ at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)}}
> {{ at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)}}
> {{ at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)}}
> {{ at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)}}
> {{ at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)}}
> {{ at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)}}
> {{ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)}}
> {{ at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}}
> {{ at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
> {{ at java.lang.reflect.Method.invoke(Method.java:498)}}
> {{ at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)}}
> {{ at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)}}
> {{ at py4j.Gateway.invoke(Gateway.java:280)}}
> {{ at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)}}
> {{ at py4j.commands.CallCommand.execute(CallCommand.java:79)}}
> {{ at py4j.GatewayConnection.run(GatewayConnection.java:214)}}
> {{ at java.lang.Thread.run(Thread.java:745)}}
> However,
> {{df.write.jdbc(url, table, properties=\{"driver": 
> "org.postgresql.Driver"},mode='append')}}
> does not produce an error and adds a row to an exisiting table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23690) VectorAssembler should have handleInvalid to handle columns with null values

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23690:


Assignee: (was: Apache Spark)

> VectorAssembler should have handleInvalid to handle columns with null values
> 
>
> Key: SPARK-23690
> URL: https://issues.apache.org/jira/browse/SPARK-23690
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: yogesh garg
>Priority: Major
>
> VectorAssembler only takes in numeric (and vectors (of numeric?)) columns as 
> an input and returns the assembled vector. It currently throws an error if it 
> sees a null value in any column. This behavior also affects `RFormula` that 
> uses VectorAssembler to assemble numeric columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19842) Informational Referential Integrity Constraints Support in Spark

2018-03-14 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399586#comment-16399586
 ] 

Takeshi Yamamuro edited comment on SPARK-19842 at 3/15/18 2:41 AM:
---

I think he just wanted to make comments in the google doc editable for reviews.


was (Author: maropu):
I think he just want to make the google doc editable for reviews.

> Informational Referential Integrity Constraints Support in Spark
> 
>
> Key: SPARK-19842
> URL: https://issues.apache.org/jira/browse/SPARK-19842
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Ioana Delaney
>Priority: Major
> Attachments: InformationalRIConstraints.doc
>
>
> *Informational Referential Integrity Constraints Support in Spark*
> This work proposes support for _informational primary key_ and _foreign key 
> (referential integrity) constraints_ in Spark. The main purpose is to open up 
> an area of query optimization techniques that rely on referential integrity 
> constraints semantics. 
> An _informational_ or _statistical constraint_ is a constraint such as a 
> _unique_, _primary key_, _foreign key_, or _check constraint_, that can be 
> used by Spark to improve query performance. Informational constraints are not 
> enforced by the Spark SQL engine; rather, they are used by Catalyst to 
> optimize the query processing. They provide semantics information that allows 
> Catalyst to rewrite queries to eliminate joins, push down aggregates, remove 
> unnecessary Distinct operations, and perform a number of other optimizations. 
> Informational constraints are primarily targeted to applications that load 
> and analyze data that originated from a data warehouse. For such 
> applications, the conditions for a given constraint are known to be true, so 
> the constraint does not need to be enforced during data load operations. 
> The attached document covers constraint definition, metastore storage, 
> constraint validation, and maintenance. The document shows many examples of 
> query performance improvements that utilize referential integrity constraints 
> and can be implemented in Spark.
> Link to the google doc: 
> [InformationalRIConstraints|https://docs.google.com/document/d/17r-cOqbKF7Px0xb9L7krKg2-RQB_gD2pxOmklm-ehsw/edit]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23690) VectorAssembler should have handleInvalid to handle columns with null values

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399815#comment-16399815
 ] 

Apache Spark commented on SPARK-23690:
--

User 'yogeshg' has created a pull request for this issue:
https://github.com/apache/spark/pull/20829

> VectorAssembler should have handleInvalid to handle columns with null values
> 
>
> Key: SPARK-23690
> URL: https://issues.apache.org/jira/browse/SPARK-23690
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: yogesh garg
>Priority: Major
>
> VectorAssembler only takes in numeric (and vectors (of numeric?)) columns as 
> an input and returns the assembled vector. It currently throws an error if it 
> sees a null value in any column. This behavior also affects `RFormula` that 
> uses VectorAssembler to assemble numeric columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23690) VectorAssembler should have handleInvalid to handle columns with null values

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23690:


Assignee: Apache Spark

> VectorAssembler should have handleInvalid to handle columns with null values
> 
>
> Key: SPARK-23690
> URL: https://issues.apache.org/jira/browse/SPARK-23690
> Project: Spark
>  Issue Type: Sub-task
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: yogesh garg
>Assignee: Apache Spark
>Priority: Major
>
> VectorAssembler only takes in numeric (and vectors (of numeric?)) columns as 
> an input and returns the assembled vector. It currently throws an error if it 
> sees a null value in any column. This behavior also affects `RFormula` that 
> uses VectorAssembler to assemble numeric columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23690) VectorAssembler should have handleInvalid to handle columns with null values

2018-03-14 Thread yogesh garg (JIRA)
yogesh garg created SPARK-23690:
---

 Summary: VectorAssembler should have handleInvalid to handle 
columns with null values
 Key: SPARK-23690
 URL: https://issues.apache.org/jira/browse/SPARK-23690
 Project: Spark
  Issue Type: Sub-task
  Components: ML
Affects Versions: 2.3.0
Reporter: yogesh garg


VectorAssembler only takes in numeric (and vectors (of numeric?)) columns as an 
input and returns the assembled vector. It currently throws an error if it sees 
a null value in any column. This behavior also affects `RFormula` that uses 
VectorAssembler to assemble numeric columns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23689) Spark 2.3.0/2.2.1 Some changes cause org.apache.spark.sql.catalyst.errors.package$TreeNodeException:

2018-03-14 Thread Yuandong Song (JIRA)
Yuandong Song created SPARK-23689:
-

 Summary: Spark 2.3.0/2.2.1 Some changes cause 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
 Key: SPARK-23689
 URL: https://issues.apache.org/jira/browse/SPARK-23689
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0, 2.2.1
 Environment: My spark standalone cluster has two workers (MEM 250G) 
and one master.

Mysql's version is 5.7.
Reporter: Yuandong Song


{code:java}
String[] str = {"supplier", "nation", "partsupp", "part","lineitem"};


for (int i = 0; i < str.length; i++) {
System.out.println(i);
Dataset t1 = ss.read().format("jdbc")
.option("useSSL", false)
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://172.16.50.104:19100/test_tpch_1g")
.option("user", "dbscale")
.option("password", "abc123")
.option("dbtable", str[i])
.load();
t1.createOrReplaceTempView(str[i]);
t1.show();
}

Properties connProp = new Properties();
connProp.put("driver", "com.mysql.jdbc.Driver");
connProp.put("useSSL", "false");
connProp.put("user", "dbscale");
connProp.put("password", "abc123");

String sqlstr = "SELECT ps_suppkey FROM partsupp WHERE ps_partkey IN " +
"( SELECT p_partkey FROM part WHERE p_name LIKE 'dark%' ) AND 
ps_availqty > " +
"( SELECT 0.5 * SUM(l_quantity) FROM lineitem WHERE l_partkey = 
partsupp.ps_partkey AND l_suppkey = partsupp.ps_suppkey AND l_shipdate >= 
'1993-01-01' AND l_shipdate < '1994-01-01' ) ";
ss.sql(sqlstr).show();{code}
I am using mysql as datasourse to make some tpch tests on spark.

These codes runs successfully on Spark 2.2.0.

But on Spark 2.2.1 and Spark 2.3.0, 
 [there is a exception|https://i.stack.imgur.com/zoRoo.png]

I guess SPARK-22472 causes it.

[http://spark.apache.org/releases/spark-release-2-2-1.html]

How can I resolve it in Spark 2.3.0 ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23687) Add MemoryStream

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23687:


Assignee: Apache Spark

> Add MemoryStream
> 
>
> Key: SPARK-23687
> URL: https://issues.apache.org/jira/browse/SPARK-23687
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Apache Spark
>Priority: Major
>
> We need a MemoryStream for continuous processing, both in order to write less 
> fragile tests and to eventually use existing stream tests to verify 
> functional equivalence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23687) Add MemoryStream

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399763#comment-16399763
 ] 

Apache Spark commented on SPARK-23687:
--

User 'jose-torres' has created a pull request for this issue:
https://github.com/apache/spark/pull/20828

> Add MemoryStream
> 
>
> Key: SPARK-23687
> URL: https://issues.apache.org/jira/browse/SPARK-23687
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> We need a MemoryStream for continuous processing, both in order to write less 
> fragile tests and to eventually use existing stream tests to verify 
> functional equivalence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23687) Add MemoryStream

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23687:


Assignee: (was: Apache Spark)

> Add MemoryStream
> 
>
> Key: SPARK-23687
> URL: https://issues.apache.org/jira/browse/SPARK-23687
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
>
> We need a MemoryStream for continuous processing, both in order to write less 
> fragile tests and to eventually use existing stream tests to verify 
> functional equivalence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23688) Refactor tests away from rate source

2018-03-14 Thread Jose Torres (JIRA)
Jose Torres created SPARK-23688:
---

 Summary: Refactor tests away from rate source
 Key: SPARK-23688
 URL: https://issues.apache.org/jira/browse/SPARK-23688
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Jose Torres


Most continuous processing tests currently use a rate source, since that was 
what was available at the time of implementation. This forces us to do a lot of 
awkward things to work around the fact that the data in the sink is not 
perfectly predictable. We should refactor to use a memory stream once it's 
implemented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23687) Add MemoryStream

2018-03-14 Thread Jose Torres (JIRA)
Jose Torres created SPARK-23687:
---

 Summary: Add MemoryStream
 Key: SPARK-23687
 URL: https://issues.apache.org/jira/browse/SPARK-23687
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Jose Torres


We need a MemoryStream for continuous processing, both in order to write less 
fragile tests and to eventually use existing stream tests to verify functional 
equivalence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22915) ML test for StructuredStreaming: spark.ml.feature, N-Z

2018-03-14 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley resolved SPARK-22915.
---
   Resolution: Fixed
Fix Version/s: 2.3.1
   2.4.0

Issue resolved by pull request 20686
[https://github.com/apache/spark/pull/20686]

> ML test for StructuredStreaming: spark.ml.feature, N-Z
> --
>
> Key: SPARK-22915
> URL: https://issues.apache.org/jira/browse/SPARK-22915
> Project: Spark
>  Issue Type: Test
>  Components: ML, Tests
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Assignee: Attila Zsolt Piros
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
>
> *For featurizers with names from N - Z*
> Task for adding Structured Streaming tests for all Models/Transformers in a 
> sub-module in spark.ml
> For an example, see LinearRegressionSuite.scala in 
> https://github.com/apache/spark/pull/19843



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-22915) ML test for StructuredStreaming: spark.ml.feature, N-Z

2018-03-14 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley reassigned SPARK-22915:
-

Assignee: Attila Zsolt Piros

> ML test for StructuredStreaming: spark.ml.feature, N-Z
> --
>
> Key: SPARK-22915
> URL: https://issues.apache.org/jira/browse/SPARK-22915
> Project: Spark
>  Issue Type: Test
>  Components: ML, Tests
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Assignee: Attila Zsolt Piros
>Priority: Major
> Fix For: 2.3.1, 2.4.0
>
>
> *For featurizers with names from N - Z*
> Task for adding Structured Streaming tests for all Models/Transformers in a 
> sub-module in spark.ml
> For an example, see LinearRegressionSuite.scala in 
> https://github.com/apache/spark/pull/19843



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23666) Undeterministic column name with UDFs

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23666:


Assignee: (was: Apache Spark)

> Undeterministic column name with UDFs
> -
>
> Key: SPARK-23666
> URL: https://issues.apache.org/jira/browse/SPARK-23666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Daniel Darabos
>Priority: Minor
>
> When you access structure fields in Spark SQL, the auto-generated result 
> column name includes an internal ID.
> {code:java}
> scala> import spark.implicits._
> scala> Seq(((1, 2), 3)).toDF("a", "b").createOrReplaceTempView("x")
> scala> spark.udf.register("f", (a: Int) => a)
> scala> spark.sql("select f(a._1) from x").show
> +-+
> |UDF:f(a._1 AS _1#148)|
> +-+
> |1|
> +-+
> {code}
> This ID ({{#148}}) is only included for UDFs.
> {code:java}
> scala> spark.sql("select factorial(a._1) from x").show
> +---+
> |factorial(a._1 AS `_1`)|
> +---+
> |  1|
> +---+
> {code}
> The internal ID is different on every invocation. The problem this causes for 
> us is that the schema of the SQL output is never the same:
> {code:java}
> scala> spark.sql("select f(a._1) from x").schema ==
>spark.sql("select f(a._1) from x").schema
> Boolean = false
> {code}
> We rely on similar schema checks when reloading persisted data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23666) Undeterministic column name with UDFs

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399673#comment-16399673
 ] 

Apache Spark commented on SPARK-23666:
--

User 'maropu' has created a pull request for this issue:
https://github.com/apache/spark/pull/20827

> Undeterministic column name with UDFs
> -
>
> Key: SPARK-23666
> URL: https://issues.apache.org/jira/browse/SPARK-23666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Daniel Darabos
>Priority: Minor
>
> When you access structure fields in Spark SQL, the auto-generated result 
> column name includes an internal ID.
> {code:java}
> scala> import spark.implicits._
> scala> Seq(((1, 2), 3)).toDF("a", "b").createOrReplaceTempView("x")
> scala> spark.udf.register("f", (a: Int) => a)
> scala> spark.sql("select f(a._1) from x").show
> +-+
> |UDF:f(a._1 AS _1#148)|
> +-+
> |1|
> +-+
> {code}
> This ID ({{#148}}) is only included for UDFs.
> {code:java}
> scala> spark.sql("select factorial(a._1) from x").show
> +---+
> |factorial(a._1 AS `_1`)|
> +---+
> |  1|
> +---+
> {code}
> The internal ID is different on every invocation. The problem this causes for 
> us is that the schema of the SQL output is never the same:
> {code:java}
> scala> spark.sql("select f(a._1) from x").schema ==
>spark.sql("select f(a._1) from x").schema
> Boolean = false
> {code}
> We rely on similar schema checks when reloading persisted data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23666) Undeterministic column name with UDFs

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23666:


Assignee: Apache Spark

> Undeterministic column name with UDFs
> -
>
> Key: SPARK-23666
> URL: https://issues.apache.org/jira/browse/SPARK-23666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Daniel Darabos
>Assignee: Apache Spark
>Priority: Minor
>
> When you access structure fields in Spark SQL, the auto-generated result 
> column name includes an internal ID.
> {code:java}
> scala> import spark.implicits._
> scala> Seq(((1, 2), 3)).toDF("a", "b").createOrReplaceTempView("x")
> scala> spark.udf.register("f", (a: Int) => a)
> scala> spark.sql("select f(a._1) from x").show
> +-+
> |UDF:f(a._1 AS _1#148)|
> +-+
> |1|
> +-+
> {code}
> This ID ({{#148}}) is only included for UDFs.
> {code:java}
> scala> spark.sql("select factorial(a._1) from x").show
> +---+
> |factorial(a._1 AS `_1`)|
> +---+
> |  1|
> +---+
> {code}
> The internal ID is different on every invocation. The problem this causes for 
> us is that the schema of the SQL output is never the same:
> {code:java}
> scala> spark.sql("select f(a._1) from x").schema ==
>spark.sql("select f(a._1) from x").schema
> Boolean = false
> {code}
> We rely on similar schema checks when reloading persisted data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23666) Undeterministic column name with UDFs

2018-03-14 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399633#comment-16399633
 ] 

Takeshi Yamamuro commented on SPARK-23666:
--

This is just a bug, so I'll make a pr later.

> Undeterministic column name with UDFs
> -
>
> Key: SPARK-23666
> URL: https://issues.apache.org/jira/browse/SPARK-23666
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Daniel Darabos
>Priority: Minor
>
> When you access structure fields in Spark SQL, the auto-generated result 
> column name includes an internal ID.
> {code:java}
> scala> import spark.implicits._
> scala> Seq(((1, 2), 3)).toDF("a", "b").createOrReplaceTempView("x")
> scala> spark.udf.register("f", (a: Int) => a)
> scala> spark.sql("select f(a._1) from x").show
> +-+
> |UDF:f(a._1 AS _1#148)|
> +-+
> |1|
> +-+
> {code}
> This ID ({{#148}}) is only included for UDFs.
> {code:java}
> scala> spark.sql("select factorial(a._1) from x").show
> +---+
> |factorial(a._1 AS `_1`)|
> +---+
> |  1|
> +---+
> {code}
> The internal ID is different on every invocation. The problem this causes for 
> us is that the schema of the SQL output is never the same:
> {code:java}
> scala> spark.sql("select f(a._1) from x").schema ==
>spark.sql("select f(a._1) from x").schema
> Boolean = false
> {code}
> We rely on similar schema checks when reloading persisted data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19842) Informational Referential Integrity Constraints Support in Spark

2018-03-14 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399586#comment-16399586
 ] 

Takeshi Yamamuro commented on SPARK-19842:
--

I think he just want to make the google doc editable for reviews.

> Informational Referential Integrity Constraints Support in Spark
> 
>
> Key: SPARK-19842
> URL: https://issues.apache.org/jira/browse/SPARK-19842
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Ioana Delaney
>Priority: Major
> Attachments: InformationalRIConstraints.doc
>
>
> *Informational Referential Integrity Constraints Support in Spark*
> This work proposes support for _informational primary key_ and _foreign key 
> (referential integrity) constraints_ in Spark. The main purpose is to open up 
> an area of query optimization techniques that rely on referential integrity 
> constraints semantics. 
> An _informational_ or _statistical constraint_ is a constraint such as a 
> _unique_, _primary key_, _foreign key_, or _check constraint_, that can be 
> used by Spark to improve query performance. Informational constraints are not 
> enforced by the Spark SQL engine; rather, they are used by Catalyst to 
> optimize the query processing. They provide semantics information that allows 
> Catalyst to rewrite queries to eliminate joins, push down aggregates, remove 
> unnecessary Distinct operations, and perform a number of other optimizations. 
> Informational constraints are primarily targeted to applications that load 
> and analyze data that originated from a data warehouse. For such 
> applications, the conditions for a given constraint are known to be true, so 
> the constraint does not need to be enforced during data load operations. 
> The attached document covers constraint definition, metastore storage, 
> constraint validation, and maintenance. The document shows many examples of 
> query performance improvements that utilize referential integrity constraints 
> and can be implemented in Spark.
> Link to the google doc: 
> [InformationalRIConstraints|https://docs.google.com/document/d/17r-cOqbKF7Px0xb9L7krKg2-RQB_gD2pxOmklm-ehsw/edit]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2018-03-14 Thread Sergei Lebedev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399488#comment-16399488
 ] 

Sergei Lebedev commented on SPARK-22674:


[~jonasamrich] I see your point, but the current implementation has known 
limitations, which make it "breaking" as is.

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23686) Make better usage of org.apache.spark.ml.util.Instrumentation

2018-03-14 Thread Bago Amirbekian (JIRA)
Bago Amirbekian created SPARK-23686:
---

 Summary: Make better usage of 
org.apache.spark.ml.util.Instrumentation
 Key: SPARK-23686
 URL: https://issues.apache.org/jira/browse/SPARK-23686
 Project: Spark
  Issue Type: Improvement
  Components: ML
Affects Versions: 2.3.0
Reporter: Bago Amirbekian


This Jira is a bit high level and might require subtasks or other jiras for 
more specific tasks.

I've noticed that we don't make the best usage of the instrumentation class. 
Specifically sometimes we bypass the instrumentation class and use the debugger 
instead. For example, 
[https://github.com/apache/spark/blob/9b9827759af2ca3eea146a6032f9165f640ce152/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala#L143]

Also there are some things that might be useful to log in the instrumentation 
class that we currently don't. For example:

number of training examples
mean/var of label (regression)

I know computing these things can be expensive in some cases, but especially 
when this data is already available we can log it for free. For example, 
Logistic Regression Summarizer computes some useful data including numRows that 
we don't log.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21642) Use FQDN for DRIVER_HOST_ADDRESS instead of ip address

2018-03-14 Thread John Zhuge (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399376#comment-16399376
 ] 

John Zhuge commented on SPARK-21642:


Thanks [~tanakahda] and [~cloud_fan] for adding this feature to support SSL. 
Unfortunately it breaks my environment where SSL is not used and FQDN can not 
be set up easily. What do you recommend? Is it possible to switch to the old 
behavior when SSL is not enabled? If this is feasible, I can create an PR.

> Use FQDN for DRIVER_HOST_ADDRESS instead of ip address
> --
>
> Key: SPARK-21642
> URL: https://issues.apache.org/jira/browse/SPARK-21642
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.2.0
>Reporter: Aki Tanaka
>Assignee: Aki Tanaka
>Priority: Major
> Fix For: 2.3.0
>
>
> In current implementation, ip address of a driver host is set to 
> DRIVER_HOST_ADDRESS [1]. This becomes a problem when we enable SSL using 
> "spark.ssl.enabled", "spark.ssl.trustStore" and "spark.ssl.keyStore" 
> properties. When we configure these properties, spark web ui is launched with 
> SSL enabled and the HTTPS server is configured with the custom SSL 
> certificate you configured in these properties.
> In this case, client gets javax.net.ssl.SSLPeerUnverifiedException exception 
> when the client accesses the spark web ui because the client fails to verify 
> the SSL certificate (Common Name of the SSL cert does not match with 
> DRIVER_HOST_ADDRESS).
> To avoid the exception, we should use FQDN of the driver host for 
> DRIVER_HOST_ADDRESS.
> [1]  
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala#L222
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L942
> Error message that client gets when the client accesses spark web ui:
> javax.net.ssl.SSLPeerUnverifiedException: Certificate for <10.102.138.239> 
> doesn't match any of the subject alternative names: []
> {code:java}
> $ spark-submit /path/to/jar
> ..
> 17/08/04 14:48:07 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 17/08/04 14:48:07 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http://10.43.3.8:4040
> $ curl -I http://10.43.3.8:4040
> HTTP/1.1 302 Found
> Date: Fri, 04 Aug 2017 14:48:20 GMT
> Location: https://10.43.3.8:4440/
> Content-Length: 0
> Server: Jetty(9.2.z-SNAPSHOT)
> $ curl -v https://10.43.3.8:4440
> * Rebuilt URL to: https://10.43.3.8:4440/
> *   Trying 10.43.3.8...
> * TCP_NODELAY set
> * Connected to 10.43.3.8 (10.43.3.8) port 4440 (#0)
> * Initializing NSS with certpath: sql:/etc/pki/nssdb
> *   CAfile: /etc/pki/tls/certs/ca-bundle.crt
>   CApath: none
> * Server certificate:
> * subject: CN=*.example.com,OU=MyDept,O=MyOrg,L=Area,C=US
> * start date: Jun 12 00:05:02 2017 GMT
> * expire date: Jun 12 00:05:02 2018 GMT
> * common name: *.example.com
> * issuer: CN=*.example.com,OU=MyDept,O=MyOrg,L=Area,C=US
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-2489) Unsupported parquet datatype optional fixed_len_byte_array

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399372#comment-16399372
 ] 

Apache Spark commented on SPARK-2489:
-

User 'aws-awinstan' has created a pull request for this issue:
https://github.com/apache/spark/pull/20826

> Unsupported parquet datatype optional fixed_len_byte_array
> --
>
> Key: SPARK-2489
> URL: https://issues.apache.org/jira/browse/SPARK-2489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.1.0, 2.2.0
>Reporter: Pei-Lun Lee
>Priority: Major
>
> tested against commit 9fe693b5
> {noformat}
> scala> sqlContext.parquetFile("/tmp/foo")
> java.lang.RuntimeException: Unsupported parquet datatype optional 
> fixed_len_byte_array(4) b
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:58)
>   at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:109)
>   at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:282)
>   at 
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes$1.apply(ParquetTypes.scala:279)
> {noformat}
> example avro schema
> {noformat}
> protocol Test {
> fixed Bytes4(4);
> record Foo {
> union {null, Bytes4} b;
> }
> }
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23685) Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2018-03-14 Thread sirisha (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399140#comment-16399140
 ] 

sirisha commented on SPARK-23685:
-

I am not able to assign it to myself. Can somebody assign it to me?

> Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive 
> Offsets (i.e. Log Compaction)
> -
>
> Key: SPARK-23685
> URL: https://issues.apache.org/jira/browse/SPARK-23685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: sirisha
>Priority: Major
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaSourceRDD & CachedKafkaConsumer assumes that the next offset will always 
> be just an increment of 1 .If not, it throws the below exception:
>  
> "Cannot fetch records in [5589, 5693) (GroupId: XXX, TopicPartition:). 
> Some data may have been lost because they are not available in Kafka any 
> more; either the data was aged out by Kafka or the topic may have been 
> deleted before all the data in the topic was processed. If you don't want 
> your streaming query to fail on such cases, set the source option 
> "failOnDataLoss" to "false". "
>  
> FYI: This bug is related to https://issues.apache.org/jira/browse/SPARK-17147
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23685) Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2018-03-14 Thread sirisha (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sirisha updated SPARK-23685:

Description: 
When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in 
KafkaSourceRDD & CachedKafkaConsumer assumes that the next offset will always 
be just an increment of 1 .If not, it throws the below exception:

 

"Cannot fetch records in [5589, 5693) (GroupId: XXX, TopicPartition:). Some 
data may have been lost because they are not available in Kafka any more; 
either the data was aged out by Kafka or the topic may have been deleted before 
all the data in the topic was processed. If you don't want your streaming query 
to fail on such cases, set the source option "failOnDataLoss" to "false". "

 

FYI: This bug is related to https://issues.apache.org/jira/browse/SPARK-17147

 

 

  was:
When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in 
KafkaSourceRDD & CachedKafkaConsumer assumes that the next offset will always 
be just an increment of 1 .If not, it throws the below exception:

 

"Cannot fetch records in [5589, 5693) (GroupId: XXX, TopicPartition:). Some 
data may have been lost because they are not available in Kafka any more; 
either the data was aged out by Kafka or the topic may have been deleted before 
all the data in the topic was processed. If you don't want your streaming query 
to fail on such cases, set the source option "failOnDataLoss" to "false". "

 

 


> Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive 
> Offsets (i.e. Log Compaction)
> -
>
> Key: SPARK-23685
> URL: https://issues.apache.org/jira/browse/SPARK-23685
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: sirisha
>Priority: Major
>
> When Kafka does log compaction offsets often end up with gaps, meaning the 
> next requested offset will be frequently not be offset+1. The logic in 
> KafkaSourceRDD & CachedKafkaConsumer assumes that the next offset will always 
> be just an increment of 1 .If not, it throws the below exception:
>  
> "Cannot fetch records in [5589, 5693) (GroupId: XXX, TopicPartition:). 
> Some data may have been lost because they are not available in Kafka any 
> more; either the data was aged out by Kafka or the topic may have been 
> deleted before all the data in the topic was processed. If you don't want 
> your streaming query to fail on such cases, set the source option 
> "failOnDataLoss" to "false". "
>  
> FYI: This bug is related to https://issues.apache.org/jira/browse/SPARK-17147
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23685) Spark Structured Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets (i.e. Log Compaction)

2018-03-14 Thread sirisha (JIRA)
sirisha created SPARK-23685:
---

 Summary: Spark Structured Streaming Kafka 0.10 Consumer Can't 
Handle Non-consecutive Offsets (i.e. Log Compaction)
 Key: SPARK-23685
 URL: https://issues.apache.org/jira/browse/SPARK-23685
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: sirisha


When Kafka does log compaction offsets often end up with gaps, meaning the next 
requested offset will be frequently not be offset+1. The logic in 
KafkaSourceRDD & CachedKafkaConsumer assumes that the next offset will always 
be just an increment of 1 .If not, it throws the below exception:

 

"Cannot fetch records in [5589, 5693) (GroupId: XXX, TopicPartition:). Some 
data may have been lost because they are not available in Kafka any more; 
either the data was aged out by Kafka or the topic may have been deleted before 
all the data in the topic was processed. If you don't want your streaming query 
to fail on such cases, set the source option "failOnDataLoss" to "false". "

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20236) Overwrite a partitioned data source table should only overwrite related partitions

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399044#comment-16399044
 ] 

Apache Spark commented on SPARK-20236:
--

User 'steveloughran' has created a pull request for this issue:
https://github.com/apache/spark/pull/20824

> Overwrite a partitioned data source table should only overwrite related 
> partitions
> --
>
> Key: SPARK-20236
> URL: https://issues.apache.org/jira/browse/SPARK-20236
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Wenchen Fan
>Assignee: Wenchen Fan
>Priority: Major
>  Labels: releasenotes
> Fix For: 2.3.0
>
>
> When we overwrite a partitioned data source table, currently Spark will 
> truncate the entire table to write new data, or truncate a bunch of 
> partitions according to the given static partitions.
> For example, {{INSERT OVERWRITE tbl ...}} will truncate the entire table, 
> {{INSERT OVERWRITE tbl PARTITION (a=1, b)}} will truncate all the partitions 
> that starts with {{a=1}}.
> This behavior is kind of reasonable as we can know which partitions will be 
> overwritten before runtime. However, hive has a different behavior that it 
> only overwrites related partitions, e.g. {{INSERT OVERWRITE tbl SELECT 
> 1,2,3}} will only overwrite partition {{a=2, b=3}}, assuming {{tbl}} has only 
> one data column and is partitioned by {{a}} and {{b}}.
> It seems better if we can follow hive's behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23684) mode append function not working

2018-03-14 Thread Evan Zamir (JIRA)
Evan Zamir created SPARK-23684:
--

 Summary: mode append function not working 
 Key: SPARK-23684
 URL: https://issues.apache.org/jira/browse/SPARK-23684
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.2.0
Reporter: Evan Zamir


{{df.write.mode('append').jdbc(url, table, properties=\{"driver": 
"org.postgresql.Driver"}) }}

produces the following error and does not write to existing table:

{{2018-03-14 11:00:08,332 root ERROR An error occurred while calling 
o894.jdbc.}}
{{: scala.MatchError: null}}
{{ at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)}}
{{ at 
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)}}
{{ at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)}}
{{ at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)}}
{{ at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)}}
{{ at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)}}
{{ at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)}}
{{ at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)}}
{{ at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)}}
{{ at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)}}
{{ at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)}}
{{ at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)}}
{{ at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)}}
{{ at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)}}
{{ at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)}}
{{ at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)}}
{{ at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)}}
{{ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)}}
{{ at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}}
{{ at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}}
{{ at java.lang.reflect.Method.invoke(Method.java:498)}}
{{ at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)}}
{{ at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)}}
{{ at py4j.Gateway.invoke(Gateway.java:280)}}
{{ at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)}}
{{ at py4j.commands.CallCommand.execute(CallCommand.java:79)}}
{{ at py4j.GatewayConnection.run(GatewayConnection.java:214)}}
{{ at java.lang.Thread.run(Thread.java:745)}}

However,

{{df.write.jdbc(url, table, properties=\{"driver": 
"org.postgresql.Driver"},mode='append')}}

does not produce an error and adds a row to an exisiting table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-03-14 Thread Pascal GILLET (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398999#comment-16398999
 ] 

Pascal GILLET edited comment on SPARK-23499 at 3/14/18 6:04 PM:


Yes, I have thought of that design. It would be more consistent that way. But 
conversely, the user can decide to set up a simple policy that would be 
applicable at the dispatcher level only: if no (weighted) role is declared on 
the Mesos side, the drivers can still be prioritized in the dispatcher's queue. 
Once the drivers are executed on Mesos, they are given equal resources.

Another possibility is to add a new boolean property 
_spark.mesos.dispatcher.queue.mapToMesosWeights_ to effectively map the 
drivers' priorities to the Mesos roles (false by default). If true, the 
_spark.mesos.dispatcher.queue.[QueueName]_ cannot be used anymore, and the 
_spark.mesos.role_ gives the priority of the driver in the queue.

In this way, the user can decide which policy is the best. What do you think?


was (Author: pgillet):
Yes, I have thought of that design. It would be more consistent that way. But 
conversely, the user can decide to set up a simple policy that would be 
applicable at the dispatcher level only: if no (weighted) role is declared on 
the Mesos side, the drivers can still be prioritized in the dispatcher's queue. 
Once the drivers are executed on Mesos, they are given equal resources.

Another possibility is to add a new boolean property 
_spark.mesos.dispatcher.queue.mapToMesosWeights_ to effectively map the 
drivers' priorities to the Mesos roles __ (false by default). If true, the 
_spark.mesos.dispatcher.queue.[QueueName]_ cannot be used anymore.

In this way, the user can decide which policy is the best. What do you think?

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1
>Reporter: Pascal GILLET
>Priority: Major
> Attachments: Screenshot from 2018-02-28 17-22-47.png
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
> {code:java}
> # Conf on the dispatcher side
> spark.mesos.dispatcher.queue.URGENT=1.0
> # Conf on the driver side
> spark.mesos.dispatcher.queue=URGENT
> spark.mesos.role=URGENT
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-03-14 Thread Pascal GILLET (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398999#comment-16398999
 ] 

Pascal GILLET commented on SPARK-23499:
---

Yes, I have thought of that design. It would be more consistent that way. But 
conversely, the user can decide to set up a simple policy that would be 
applicable at the dispatcher level only: if no (weighted) role is declared on 
the Mesos side, the drivers can still be prioritized in the dispatcher's queue. 
Once the drivers are executed on Mesos, they are given equal resources.

Another possibility is to add a new boolean property 
_spark.mesos.dispatcher.queue.mapToMesosWeights_ to effectively map the 
drivers' priorities to the Mesos roles __ (false by default). If true, the 
_spark.mesos.dispatcher.queue.[QueueName]_ cannot be used anymore.

In this way, the user can decide which policy is the best. What do you think?

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1
>Reporter: Pascal GILLET
>Priority: Major
> Attachments: Screenshot from 2018-02-28 17-22-47.png
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
> {code:java}
> # Conf on the dispatcher side
> spark.mesos.dispatcher.queue.URGENT=1.0
> # Conf on the driver side
> spark.mesos.dispatcher.queue=URGENT
> spark.mesos.role=URGENT
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23674) Add Spark ML Listener for Tracking ML Pipeline Status

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23674:


Assignee: Apache Spark

> Add Spark ML Listener for Tracking ML Pipeline Status
> -
>
> Key: SPARK-23674
> URL: https://issues.apache.org/jira/browse/SPARK-23674
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Mingjie Tang
>Assignee: Apache Spark
>Priority: Major
>
> Currently, Spark provides status monitoring for different components of 
> Spark, like spark history server, streaming listener, sql listener and etc. 
> The use case would be (1) front UI to track the status of training coverage 
> rate during iteration, then DS can understand how the job converge when 
> training, like K-means, Logistic and other linear regression model.  (2) 
> tracking the data lineage for the input and output of training data.  
> In this proposal, we hope to provide Spark ML pipeline listener to track the 
> status of Spark ML pipeline status includes: 
>  # ML pipeline create and saved 
>  # ML pipeline model created, saved and load  
>  # ML model training status monitoring  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23674) Add Spark ML Listener for Tracking ML Pipeline Status

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398996#comment-16398996
 ] 

Apache Spark commented on SPARK-23674:
--

User 'jmwdpk' has created a pull request for this issue:
https://github.com/apache/spark/pull/20823

> Add Spark ML Listener for Tracking ML Pipeline Status
> -
>
> Key: SPARK-23674
> URL: https://issues.apache.org/jira/browse/SPARK-23674
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Mingjie Tang
>Priority: Major
>
> Currently, Spark provides status monitoring for different components of 
> Spark, like spark history server, streaming listener, sql listener and etc. 
> The use case would be (1) front UI to track the status of training coverage 
> rate during iteration, then DS can understand how the job converge when 
> training, like K-means, Logistic and other linear regression model.  (2) 
> tracking the data lineage for the input and output of training data.  
> In this proposal, we hope to provide Spark ML pipeline listener to track the 
> status of Spark ML pipeline status includes: 
>  # ML pipeline create and saved 
>  # ML pipeline model created, saved and load  
>  # ML model training status monitoring  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23674) Add Spark ML Listener for Tracking ML Pipeline Status

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23674:


Assignee: (was: Apache Spark)

> Add Spark ML Listener for Tracking ML Pipeline Status
> -
>
> Key: SPARK-23674
> URL: https://issues.apache.org/jira/browse/SPARK-23674
> Project: Spark
>  Issue Type: Improvement
>  Components: ML
>Affects Versions: 2.3.0
>Reporter: Mingjie Tang
>Priority: Major
>
> Currently, Spark provides status monitoring for different components of 
> Spark, like spark history server, streaming listener, sql listener and etc. 
> The use case would be (1) front UI to track the status of training coverage 
> rate during iteration, then DS can understand how the job converge when 
> training, like K-means, Logistic and other linear regression model.  (2) 
> tracking the data lineage for the input and output of training data.  
> In this proposal, we hope to provide Spark ML pipeline listener to track the 
> status of Spark ML pipeline status includes: 
>  # ML pipeline create and saved 
>  # ML pipeline model created, saved and load  
>  # ML model training status monitoring  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23683) FileCommitProtocol.instantiate to require 3-arg constructor for dynamic partition overwrite

2018-03-14 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-23683:
--

 Summary: FileCommitProtocol.instantiate to require 3-arg 
constructor for dynamic partition overwrite
 Key: SPARK-23683
 URL: https://issues.apache.org/jira/browse/SPARK-23683
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Steve Loughran


with SPARK-20236 {{FileCommitProtocol.instantiate()}} looks for a three 
argument constructor, passing in the {{dynamicPartitionOverwrite}} parameter. 
If there is no such constructor, it falls back to the classic two-arg one.

When {{InsertIntoHadoopFsRelationCommand}} passes down that 
{{dynamicPartitionOverwrite}} flag to  {{FileCommitProtocol.instantiate()}}, it 
_assumes_ that the instantiated protocol supports the specific requirements of 
dynamic partition overwrite. It does not notice when this does not hold, and so 
the output generated may be incorrect.

Proposed: when dynamicPartitionOverwrite == true, require the protocol 
implementation to have a 3-arg constructor.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20536) Extend ColumnName to create StructFields with explicit nullable

2018-03-14 Thread Jacek Laskowski (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398899#comment-16398899
 ] 

Jacek Laskowski commented on SPARK-20536:
-

I'm not sure how meaningful it still is, but given it's still open, it could be 
fixed.

> Extend ColumnName to create StructFields with explicit nullable
> ---
>
> Key: SPARK-20536
> URL: https://issues.apache.org/jira/browse/SPARK-20536
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>
> {{ColumnName}} defines methods to create {{StructFields}}.
> It'd be very user-friendly if there were methods to create {{StructFields}} 
> with explicit {{nullable}} property (currently implicitly {{true}}).
> That could look as follows:
> {code}
> // E.g. def int: StructField = StructField(name, IntegerType)
> def int(nullable: Boolean): StructField = StructField(name, IntegerType, 
> nullable)
> // or (untested)
> def int(nullable: Boolean): StructField = int.copy(nullable = nullable)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23499) Mesos Cluster Dispatcher should support priority queues to submit drivers

2018-03-14 Thread Susan X. Huynh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398880#comment-16398880
 ] 

Susan X. Huynh commented on SPARK-23499:


To ensure a consistent policy, maybe the queue should always be based on the 
role? Instead of making it the user's responsibility to create the mapping: 
when submitting an application, the user only specifies the role, not the 
queue. Spark would infer the queue. Have you considered designing it this way? 
[~pgillet]

cc [~skonto]

> Mesos Cluster Dispatcher should support priority queues to submit drivers
> -
>
> Key: SPARK-23499
> URL: https://issues.apache.org/jira/browse/SPARK-23499
> Project: Spark
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 2.2.1, 2.2.2, 2.3.0, 2.3.1
>Reporter: Pascal GILLET
>Priority: Major
> Attachments: Screenshot from 2018-02-28 17-22-47.png
>
>
> As for Yarn, Mesos users should be able to specify priority queues to define 
> a workload management policy for queued drivers in the Mesos Cluster 
> Dispatcher.
> Submitted drivers are *currently* kept in order of their submission: the 
> first driver added to the queue will be the first one to be executed (FIFO).
> Each driver could have a "priority" associated with it. A driver with high 
> priority is served (Mesos resources) before a driver with low priority. If 
> two drivers have the same priority, they are served according to their submit 
> date in the queue.
> To set up such priority queues, the following changes are proposed:
>  * The Mesos Cluster Dispatcher can optionally be configured with the 
> _spark.mesos.dispatcher.queue.[QueueName]_ property. This property takes a 
> float as value. This adds a new queue named _QueueName_ for submitted drivers 
> with the specified priority.
>  Higher numbers indicate higher priority.
>  The user can then specify multiple queues.
>  * A driver can be submitted to a specific queue with 
> _spark.mesos.dispatcher.queue_. This property takes the name of a queue 
> previously declared in the dispatcher as value.
> By default, the dispatcher has a single "default" queue with 0.0 priority 
> (cannot be overridden). If none of the properties above are specified, the 
> behavior is the same as the current one (i.e. simple FIFO).
> Additionaly, it is possible to implement a consistent and overall workload 
> management policy throughout the lifecycle of drivers by mapping these 
> priority queues to weighted Mesos roles if any (i.e. from the QUEUED state in 
> the dispatcher to the final states in the Mesos cluster), and by specifying a 
> _spark.mesos.role_ along with a _spark.mesos.dispatcher.queue_ when 
> submitting an application.
> For example, with the URGENT Mesos role:
> {code:java}
> # Conf on the dispatcher side
> spark.mesos.dispatcher.queue.URGENT=1.0
> # Conf on the driver side
> spark.mesos.dispatcher.queue=URGENT
> spark.mesos.role=URGENT
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2018-03-14 Thread Jonas Amrich (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonas Amrich updated SPARK-22674:
-
Affects Version/s: 2.3.0

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22674) PySpark breaks serialization of namedtuple subclasses

2018-03-14 Thread Jonas Amrich (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398842#comment-16398842
 ] 

Jonas Amrich commented on SPARK-22674:
--

[~lebedev] I completely agree, removing it is IMHO the cleanest solution.

I think namedtuples were originally used for representation of structured data 
in RDDs - see -SPARK-2010- and -SPARK-1687- . This mechanism was then replaced 
by the Row class in DataFrame API and it seems to me there is no use of it in 
pyspark internals. However removing it would still be a BC break.

> PySpark breaks serialization of namedtuple subclasses
> -
>
> Key: SPARK-22674
> URL: https://issues.apache.org/jira/browse/SPARK-22674
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.2.0
>Reporter: Jonas Amrich
>Priority: Major
>
> Pyspark monkey patches the namedtuple class to make it serializable, however 
> this breaks serialization of its subclasses. With current implementation, any 
> subclass will be serialized (and deserialized) as it's parent namedtuple. 
> Consider this code, which will fail with {{AttributeError: 'Point' object has 
> no attribute 'sum'}}:
> {code}
> from collections import namedtuple
> Point = namedtuple("Point", "x y")
> class PointSubclass(Point):
> def sum(self):
> return self.x + self.y
> rdd = spark.sparkContext.parallelize([[PointSubclass(1, 1)]])
> rdd.collect()[0][0].sum()
> {code}
> Moreover, as PySpark hijacks all namedtuples in the main module, importing 
> pyspark breaks serialization of namedtuple subclasses even in code which is 
> not related to spark / distributed execution. I don't see any clean solution 
> to this; a possible workaround may be to limit serialization hack only to 
> direct namedtuple subclasses like in 
> https://github.com/JonasAmrich/spark/commit/f3efecee28243380ecf6657fe54e1a165c1b7204



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-14 Thread Yuriy Bondaruk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23682:
---
Attachment: Spark executors GC time.png
Screen Shot 2018-03-10 at 18.53.49.png
Screen Shot 2018-03-07 at 21.52.17.png

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Spark executors GC time.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode.Append())
> .queryName("test-stream")
> .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For 

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-14 Thread Yuriy Bondaruk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23682:
---
Description: 
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
tasks) Reason: Container marked as failed: 
container_1520214726510_0001_01_03 on host: 
ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal{quote}

Stream creating looks something like this:
{quote}session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();{quote}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.

  was:
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
tasks) Reason: Container marked as failed: 
container_1520214726510_0001_01_03 on host: 
ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal{quote}

Stream creating looks something like this:

session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.


> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> 

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-14 Thread Yuriy Bondaruk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23682:
---
Description: 
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
tasks) Reason: Container marked as failed: 
container_1520214726510_0001_01_03 on host: 
ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal{quote}

Stream creating looks something like this:

session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.

  was:
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_03 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal}}

Stream creating looks something like this:

{{session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.


> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> 

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-14 Thread Yuriy Bondaruk (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Bondaruk updated SPARK-23682:
---
Description: 
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_03 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal}}

Stream creating looks something like this:

{{session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.

  was:
It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_03 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137}}
{{ Container exited with a non-zero exit code 137}}
{{ Killed by external signal}}

Stream creating looks something like this:

{{session}}
{{ .readStream()}}
{{ .schema(inputSchema)}}
{{ .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)}}
{{ .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)}}
{{ .csv("s3://test-bucket/input")}}
{{ .as(Encoders.bean(TestRecord.class))}}
{{ .flatMap(mf, Encoders.bean(TestRecord.class))}}
{{ .dropDuplicates("testId", "testName")}}
{{ .withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))}}
{{ .writeStream()}}
{{ .option("path", "s3://test-bucket/output")}}
{{ .option("checkpointLocation", "s3://test-bucket/checkpoint")}}
{{ .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))}}
{{ .partitionBy("year")}}
{{ .format("parquet")}}
{{ .outputMode(OutputMode.Append())}}
{{ .queryName("test-stream")}}
{{ .start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.


> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>

[jira] [Created] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-14 Thread Yuriy Bondaruk (JIRA)
Yuriy Bondaruk created SPARK-23682:
--

 Summary: Memory issue with Spark structured streaming
 Key: SPARK-23682
 URL: https://issues.apache.org/jira/browse/SPARK-23682
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 2.2.0
 Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
|spark.blacklist.decommissioning.enabled|true|
|spark.blacklist.decommissioning.timeout|1h|
|spark.cleaner.periodicGC.interval|10min|
|spark.default.parallelism|18|
|spark.dynamicAllocation.enabled|false|
|spark.eventLog.enabled|true|
|spark.executor.cores|3|
|spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
-XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
|spark.executor.id|driver|
|spark.executor.instances|3|
|spark.executor.memory|22G|
|spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
|spark.hadoop.parquet.enable.summary-metadata|false|
|spark.hadoop.yarn.timeline-service.enabled|false|
|spark.jars| |
|spark.master|yarn|
|spark.memory.fraction|0.9|
|spark.memory.storageFraction|0.3|
|spark.memory.useLegacyMode|false|
|spark.rdd.compress|true|
|spark.resourceManager.cleanupExpiredHost|true|
|spark.scheduler.mode|FIFO|
|spark.serializer|org.apache.spark.serializer.KryoSerializer|
|spark.shuffle.service.enabled|true|
|spark.speculation|false|
|spark.sql.parquet.filterPushdown|true|
|spark.sql.parquet.mergeSchema|false|
|spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
|spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
|spark.submit.deployMode|client|
|spark.yarn.am.cores|1|
|spark.yarn.am.memory|2G|
|spark.yarn.am.memoryOverhead|1G|
|spark.yarn.executor.memoryOverhead|3G|
Reporter: Yuriy Bondaruk


It seems like there is an issue with memory in structured streaming. A stream 
with aggregation (dropDuplicates()) and data partitioning constantly increases 
memory usage and finally executors fails with exit code 137:

{{ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1520214726510_0001_01_03 on 
host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
Container killed on request. Exit code is 137}}
{{ Container exited with a non-zero exit code 137}}
{{ Killed by external signal}}

Stream creating looks something like this:

{{session}}
{{ .readStream()}}
{{ .schema(inputSchema)}}
{{ .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)}}
{{ .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)}}
{{ .csv("s3://test-bucket/input")}}
{{ .as(Encoders.bean(TestRecord.class))}}
{{ .flatMap(mf, Encoders.bean(TestRecord.class))}}
{{ .dropDuplicates("testId", "testName")}}
{{ .withColumn("year", 
functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
""))}}
{{ .writeStream()}}
{{ .option("path", "s3://test-bucket/output")}}
{{ .option("checkpointLocation", "s3://test-bucket/checkpoint")}}
{{ .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))}}
{{ .partitionBy("year")}}
{{ .format("parquet")}}
{{ .outputMode(OutputMode.Append())}}
{{ .queryName("test-stream")}}
{{ .start();}}

Analyzing the heap dump I found that most of the memory used by 
{{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} 
that is referenced from 
[[StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
 

On the first glance it looks normal since that is how Spark keeps aggregation 
keys in memory. However I did my testing by renaming files in source folder, so 
that they could be picked up by spark again. Since input records are the same 
all further rows should be rejected as duplicates and memory consumption 
shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19842) Informational Referential Integrity Constraints Support in Spark

2018-03-14 Thread Ioana Delaney (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398819#comment-16398819
 ] 

Ioana Delaney commented on SPARK-19842:
---

[~cloud_fan] The document is already uploaded to google doc. Please see the 
link above.

> Informational Referential Integrity Constraints Support in Spark
> 
>
> Key: SPARK-19842
> URL: https://issues.apache.org/jira/browse/SPARK-19842
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Ioana Delaney
>Priority: Major
> Attachments: InformationalRIConstraints.doc
>
>
> *Informational Referential Integrity Constraints Support in Spark*
> This work proposes support for _informational primary key_ and _foreign key 
> (referential integrity) constraints_ in Spark. The main purpose is to open up 
> an area of query optimization techniques that rely on referential integrity 
> constraints semantics. 
> An _informational_ or _statistical constraint_ is a constraint such as a 
> _unique_, _primary key_, _foreign key_, or _check constraint_, that can be 
> used by Spark to improve query performance. Informational constraints are not 
> enforced by the Spark SQL engine; rather, they are used by Catalyst to 
> optimize the query processing. They provide semantics information that allows 
> Catalyst to rewrite queries to eliminate joins, push down aggregates, remove 
> unnecessary Distinct operations, and perform a number of other optimizations. 
> Informational constraints are primarily targeted to applications that load 
> and analyze data that originated from a data warehouse. For such 
> applications, the conditions for a given constraint are known to be true, so 
> the constraint does not need to be enforced during data load operations. 
> The attached document covers constraint definition, metastore storage, 
> constraint validation, and maintenance. The document shows many examples of 
> query performance improvements that utilize referential integrity constraints 
> and can be implemented in Spark.
> Link to the google doc: 
> [InformationalRIConstraints|https://docs.google.com/document/d/17r-cOqbKF7Px0xb9L7krKg2-RQB_gD2pxOmklm-ehsw/edit]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22918) sbt test (spark - local) fail after upgrading to 2.2.1 with: java.security.AccessControlException: access denied org.apache.derby.security.SystemPermission( "engine",

2018-03-14 Thread Vivek (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398799#comment-16398799
 ] 

Vivek commented on SPARK-22918:
---

+1

> sbt test (spark - local) fail after upgrading to 2.2.1 with: 
> java.security.AccessControlException: access denied 
> org.apache.derby.security.SystemPermission( "engine", "usederbyinternals" )
> 
>
> Key: SPARK-22918
> URL: https://issues.apache.org/jira/browse/SPARK-22918
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.1
>Reporter: Damian Momot
>Priority: Major
>
> After upgrading 2.2.0 -> 2.2.1 sbt test command in one of my projects started 
> to fail with following exception:
> {noformat}
> java.security.AccessControlException: access denied 
> org.apache.derby.security.SystemPermission( "engine", "usederbyinternals" )
>   at 
> java.security.AccessControlContext.checkPermission(AccessControlContext.java:472)
>   at 
> java.security.AccessController.checkPermission(AccessController.java:884)
>   at 
> org.apache.derby.iapi.security.SecurityUtil.checkDerbyInternalsPrivilege(Unknown
>  Source)
>   at org.apache.derby.iapi.services.monitor.Monitor.startMonitor(Unknown 
> Source)
>   at org.apache.derby.iapi.jdbc.JDBCBoot$1.run(Unknown Source)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at org.apache.derby.iapi.jdbc.JDBCBoot.boot(Unknown Source)
>   at org.apache.derby.iapi.jdbc.JDBCBoot.boot(Unknown Source)
>   at org.apache.derby.jdbc.EmbeddedDriver.boot(Unknown Source)
>   at org.apache.derby.jdbc.EmbeddedDriver.(Unknown Source)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at java.lang.Class.newInstance(Class.java:442)
>   at 
> org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
>   at 
> org.datanucleus.store.rdbms.connectionpool.BoneCPConnectionPoolFactory.createConnectionPool(BoneCPConnectionPoolFactory.java:54)
>   at 
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
>   at 
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
>   at 
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:85)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
>   at 
> org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325)
>   at 
> org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:282)
>   at 
> org.datanucleus.store.AbstractStoreManager.(AbstractStoreManager.java:240)
>   at 
> org.datanucleus.store.rdbms.RDBMSStoreManager.(RDBMSStoreManager.java:286)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
>   at 
> org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
>   at 
> org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
>   at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
>   at 
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
>   at 
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
>   at 
> 

[jira] [Updated] (SPARK-23681) Switch OrcFileFormat to newer hadoop.mapreduce output classes

2018-03-14 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-23681:
---
Summary: Switch OrcFileFormat to newer hadoop.mapreduce output classes  
(was: Switch OrcFileFormat to using newer hadoop.mapreduce output classes)

> Switch OrcFileFormat to newer hadoop.mapreduce output classes
> -
>
> Key: SPARK-23681
> URL: https://issues.apache.org/jira/browse/SPARK-23681
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Priority: Minor
>
> the classes in org.apache.spark.sql.execution.datasources.orc generate their 
> file output writer and bind to an output committer via the old, original, 
> barely maintained {{org.apache.hadoop.mapred.FileOutputFormat}} which is 
> inflexible & doesn't support pluggable committers a la 
> MAPREDUCE-6956/HADOOP-13786.
> Moving to the hadoop.mapreduce packages for this is compatible & the spark 
> layer, switches over to the maintained codebase & lets you pick up the new 
> committers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23680:


Assignee: (was: Apache Spark)

> entrypoint.sh does not accept arbitrary UIDs, returning as an error
> ---
>
> Key: SPARK-23680
> URL: https://issues.apache.org/jira/browse/SPARK-23680
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: OpenShift
>Reporter: Ricardo Martinelli de Oliveira
>Priority: Major
>  Labels: easyfix
>
> Openshift supports running pods using arbitrary UIDs 
> ([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
>   to improve security. Although entrypoint.sh was developed to cover this 
> feature, the script is returning an error[1].
> The issue is that the script uses getent to find the passwd entry of the 
> current UID, and if the entry is not found it creates an entry in 
> /etc/passwd. According to the getent man page:
> {code:java}
> EXIT STATUS
>    One of the following exit values can be returned by getent:
>   0 Command completed successfully.
>   1 Missing arguments, or database unknown.
>   2 One or more supplied key could not be found in the 
> database.
>   3 Enumeration not supported on this database.
> {code}
> And since the script begin with a "set -ex" command, which means it turns on 
> debug and breaks the script if the command pipelines returns an exit code 
> other than 0.--
> Having that said, this line below must be changed to remove the "-e" flag 
> from set command:
> https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20
>  
>  
> [1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398779#comment-16398779
 ] 

Apache Spark commented on SPARK-23680:
--

User 'rimolive' has created a pull request for this issue:
https://github.com/apache/spark/pull/20822

> entrypoint.sh does not accept arbitrary UIDs, returning as an error
> ---
>
> Key: SPARK-23680
> URL: https://issues.apache.org/jira/browse/SPARK-23680
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: OpenShift
>Reporter: Ricardo Martinelli de Oliveira
>Priority: Major
>  Labels: easyfix
>
> Openshift supports running pods using arbitrary UIDs 
> ([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
>   to improve security. Although entrypoint.sh was developed to cover this 
> feature, the script is returning an error[1].
> The issue is that the script uses getent to find the passwd entry of the 
> current UID, and if the entry is not found it creates an entry in 
> /etc/passwd. According to the getent man page:
> {code:java}
> EXIT STATUS
>    One of the following exit values can be returned by getent:
>   0 Command completed successfully.
>   1 Missing arguments, or database unknown.
>   2 One or more supplied key could not be found in the 
> database.
>   3 Enumeration not supported on this database.
> {code}
> And since the script begin with a "set -ex" command, which means it turns on 
> debug and breaks the script if the command pipelines returns an exit code 
> other than 0.--
> Having that said, this line below must be changed to remove the "-e" flag 
> from set command:
> https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20
>  
>  
> [1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23680:


Assignee: Apache Spark

> entrypoint.sh does not accept arbitrary UIDs, returning as an error
> ---
>
> Key: SPARK-23680
> URL: https://issues.apache.org/jira/browse/SPARK-23680
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: OpenShift
>Reporter: Ricardo Martinelli de Oliveira
>Assignee: Apache Spark
>Priority: Major
>  Labels: easyfix
>
> Openshift supports running pods using arbitrary UIDs 
> ([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
>   to improve security. Although entrypoint.sh was developed to cover this 
> feature, the script is returning an error[1].
> The issue is that the script uses getent to find the passwd entry of the 
> current UID, and if the entry is not found it creates an entry in 
> /etc/passwd. According to the getent man page:
> {code:java}
> EXIT STATUS
>    One of the following exit values can be returned by getent:
>   0 Command completed successfully.
>   1 Missing arguments, or database unknown.
>   2 One or more supplied key could not be found in the 
> database.
>   3 Enumeration not supported on this database.
> {code}
> And since the script begin with a "set -ex" command, which means it turns on 
> debug and breaks the script if the command pipelines returns an exit code 
> other than 0.--
> Having that said, this line below must be changed to remove the "-e" flag 
> from set command:
> https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20
>  
>  
> [1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23681) Switch OrcFileFormat to using newer hadoop.mapreduce output classes

2018-03-14 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran updated SPARK-23681:
---
Summary: Switch OrcFileFormat to using newer hadoop.mapreduce output 
classes  (was: Move OrcFileFormat switch to using hadoop.mapreduce classes)

> Switch OrcFileFormat to using newer hadoop.mapreduce output classes
> ---
>
> Key: SPARK-23681
> URL: https://issues.apache.org/jira/browse/SPARK-23681
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Priority: Minor
>
> the classes in org.apache.spark.sql.execution.datasources.orc generate their 
> file output writer and bind to an output committer via the old, original, 
> barely maintained {{org.apache.hadoop.mapred.FileOutputFormat}} which is 
> inflexible & doesn't support pluggable committers a la 
> MAPREDUCE-6956/HADOOP-13786.
> Moving to the hadoop.mapreduce packages for this is compatible & the spark 
> layer, switches over to the maintained codebase & lets you pick up the new 
> committers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20536) Extend ColumnName to create StructFields with explicit nullable

2018-03-14 Thread Efim Poberezkin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398754#comment-16398754
 ] 

Efim Poberezkin commented on SPARK-20536:
-

Hello, [~jlaskowski], is this still a meaningful ticket? I could make a PR with 
ColumnName extension if it is but this functionality seems to be present.

> Extend ColumnName to create StructFields with explicit nullable
> ---
>
> Key: SPARK-20536
> URL: https://issues.apache.org/jira/browse/SPARK-20536
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Jacek Laskowski
>Priority: Trivial
>
> {{ColumnName}} defines methods to create {{StructFields}}.
> It'd be very user-friendly if there were methods to create {{StructFields}} 
> with explicit {{nullable}} property (currently implicitly {{true}}).
> That could look as follows:
> {code}
> // E.g. def int: StructField = StructField(name, IntegerType)
> def int(nullable: Boolean): StructField = StructField(name, IntegerType, 
> nullable)
> // or (untested)
> def int(nullable: Boolean): StructField = int.copy(nullable = nullable)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Erik Erlandson (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Erik Erlandson updated SPARK-23680:
---
 Flags: Important
Labels: easyfix  (was: )

> entrypoint.sh does not accept arbitrary UIDs, returning as an error
> ---
>
> Key: SPARK-23680
> URL: https://issues.apache.org/jira/browse/SPARK-23680
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: OpenShift
>Reporter: Ricardo Martinelli de Oliveira
>Priority: Major
>  Labels: easyfix
>
> Openshift supports running pods using arbitrary UIDs 
> ([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
>   to improve security. Although entrypoint.sh was developed to cover this 
> feature, the script is returning an error[1].
> The issue is that the script uses getent to find the passwd entry of the 
> current UID, and if the entry is not found it creates an entry in 
> /etc/passwd. According to the getent man page:
> {code:java}
> EXIT STATUS
>    One of the following exit values can be returned by getent:
>   0 Command completed successfully.
>   1 Missing arguments, or database unknown.
>   2 One or more supplied key could not be found in the 
> database.
>   3 Enumeration not supported on this database.
> {code}
> And since the script begin with a "set -ex" command, which means it turns on 
> debug and breaks the script if the command pipelines returns an exit code 
> other than 0.--
> Having that said, this line below must be changed to remove the "-e" flag 
> from set command:
> https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20
>  
>  
> [1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23681) Move OrcFileFormat switch to using hadoop.mapreduce classes

2018-03-14 Thread Steve Loughran (JIRA)
Steve Loughran created SPARK-23681:
--

 Summary: Move OrcFileFormat switch to using hadoop.mapreduce 
classes
 Key: SPARK-23681
 URL: https://issues.apache.org/jira/browse/SPARK-23681
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Steve Loughran


the classes in org.apache.spark.sql.execution.datasources.orc generate their 
file output writer and bind to an output committer via the old, original, 
barely maintained {{org.apache.hadoop.mapred.FileOutputFormat}} which is 
inflexible & doesn't support pluggable committers a la 
MAPREDUCE-6956/HADOOP-13786.

Moving to the hadoop.mapreduce packages for this is compatible & the spark 
layer, switches over to the maintained codebase & lets you pick up the new 
committers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23665) Add adaptive algorithm to select query result collect method

2018-03-14 Thread zhoukang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398706#comment-16398706
 ] 

zhoukang commented on SPARK-23665:
--

Add this logic so that users do not need to choose whether to set 
{code:java}
spark.sql.thriftServer.incrementalCollect
{code}
or not. Does this make sense?Thanks!

> Add adaptive algorithm to select query result collect method
> 
>
> Key: SPARK-23665
> URL: https://issues.apache.org/jira/browse/SPARK-23665
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1, 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, we use configuration like 
> {code:java}
> spark.sql.thriftServer.incrementalCollect
> {code}
> to specify query result collect method.
> Actually,we can estimate the size of the result and select collect method 
> automatically. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Erik Erlandson (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398701#comment-16398701
 ] 

Erik Erlandson commented on SPARK-23680:


[~rmartine] thanks for catching this! It will impact platforms running w/ 
anonymous uid such as OpenShift.

> entrypoint.sh does not accept arbitrary UIDs, returning as an error
> ---
>
> Key: SPARK-23680
> URL: https://issues.apache.org/jira/browse/SPARK-23680
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: OpenShift
>Reporter: Ricardo Martinelli de Oliveira
>Priority: Major
>  Labels: easyfix
>
> Openshift supports running pods using arbitrary UIDs 
> ([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
>   to improve security. Although entrypoint.sh was developed to cover this 
> feature, the script is returning an error[1].
> The issue is that the script uses getent to find the passwd entry of the 
> current UID, and if the entry is not found it creates an entry in 
> /etc/passwd. According to the getent man page:
> {code:java}
> EXIT STATUS
>    One of the following exit values can be returned by getent:
>   0 Command completed successfully.
>   1 Missing arguments, or database unknown.
>   2 One or more supplied key could not be found in the 
> database.
>   3 Enumeration not supported on this database.
> {code}
> And since the script begin with a "set -ex" command, which means it turns on 
> debug and breaks the script if the command pipelines returns an exit code 
> other than 0.--
> Having that said, this line below must be changed to remove the "-e" flag 
> from set command:
> https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20
>  
>  
> [1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23680) entrypoint.sh does not accept arbitrary UIDs, returning as an error

2018-03-14 Thread Ricardo Martinelli de Oliveira (JIRA)
Ricardo Martinelli de Oliveira created SPARK-23680:
--

 Summary: entrypoint.sh does not accept arbitrary UIDs, returning 
as an error
 Key: SPARK-23680
 URL: https://issues.apache.org/jira/browse/SPARK-23680
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.0
 Environment: OpenShift
Reporter: Ricardo Martinelli de Oliveira


Openshift supports running pods using arbitrary UIDs 
([https://docs.openshift.com/container-platform/3.7/creating_images/guidelines.html#openshift-specific-guidelines)]
  to improve security. Although entrypoint.sh was developed to cover this 
feature, the script is returning an error[1].

The issue is that the script uses getent to find the passwd entry of the 
current UID, and if the entry is not found it creates an entry in /etc/passwd. 
According to the getent man page:
{code:java}
EXIT STATUS
   One of the following exit values can be returned by getent:
  0 Command completed successfully.
  1 Missing arguments, or database unknown.
  2 One or more supplied key could not be found in the database.
  3 Enumeration not supported on this database.
{code}
And since the script begin with a "set -ex" command, which means it turns on 
debug and breaks the script if the command pipelines returns an exit code other 
than 0.--

Having that said, this line below must be changed to remove the "-e" flag from 
set command:

https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L20

 

 
[1]https://github.com/apache/spark/blob/v2.3.0/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L25-L34



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22634) Update Bouncy castle dependency

2018-03-14 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398659#comment-16398659
 ] 

Steve Loughran commented on SPARK-22634:


moving to jets3t 0.9.4 breaks the (legacy) s3 & s3n clients (SPARK-23562). 
Filed  SPARK-23654 to cut jets3t out completely. It's not going to be fixable 
for spark to keep bouncy castle current and have the old FS connectors work. 
Simplest to cut the JAR and have those filesystems fail fast.

If this patch is backported, the release notes should tell people to switch to 
s3a. Which they can only safely do with Hadoop 2.7.x+

or, put differently: this patch stops anyone with the ASF Hadoop-2.6 set of 
jars in their packaging from talking to s3. 

> Update Bouncy castle dependency
> ---
>
> Key: SPARK-22634
> URL: https://issues.apache.org/jira/browse/SPARK-22634
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core, SQL, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Lior Regev
>Assignee: Sean Owen
>Priority: Minor
> Fix For: 2.3.0
>
>
> Spark's usage of jets3t library as well as Spark's own Flume and Kafka 
> streaming uses bouncy castle version 1.51
> This is an outdated version as the latest one is 1.58
> This, in turn renders packages such as 
> [spark-hadoopcryptoledger-ds|https://github.com/ZuInnoTe/spark-hadoopcryptoledger-ds]
>  unusable since these require 1.58 and spark's distributions come along with 
> 1.51
> My own attempt was to run on EMR, and since I automatically get all of 
> spark's dependecies (bouncy castle 1.51 being one of them) into the 
> classpath, using the library to parse blockchain data failed due to missing 
> functionality.
> I have also opened an 
> [issue|https://bitbucket.org/jmurty/jets3t/issues/242/bouncycastle-dependency]
>  with jets3t to update their dependecy as well, but along with that Spark 
> would have to update it's own or at least be packaged with a newer version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23654) cut jets3t as a dependency of spark-core; exclude it from hadoop-cloud module as incompatible

2018-03-14 Thread Steve Loughran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398653#comment-16398653
 ] 

Steve Loughran commented on SPARK-23654:


# In Hadoop 3.x anyone trying to create an s3n client is told to upgrade to the 
s3a connector; without that code snippet callers will get a class not found 
exception, which may lull them into thinking its fixable.
# spark-kinesis declares a "provided dependency on the jar. If it really needs 
it, it would make sense to scope it to compile, and at least remove it from the 
main spark-core transient dependency list.

> cut jets3t as a dependency of spark-core; exclude it from hadoop-cloud module 
> as incompatible
> -
>
> Key: SPARK-23654
> URL: https://issues.apache.org/jira/browse/SPARK-23654
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Steve Loughran
>Priority: Minor
>
> Spark core declares a dependency on Jets3t, which pulls in other cruft
> # the hadoop-cloud module pulls in the hadoop-aws module with the 
> jets3t-compatible connectors, and the relevant dependencies: the spark-core 
> dependency is incomplete if that module isn't built, and superflous or 
> inconsistent if it is.
> # We've cut out s3n/s3 and all dependencies on jets3t entirely from hadoop 
> 3.x in favour we're willing to maintain.
> JetS3t was wonderful when it came out, but now the amazon SDKs massively 
> exceed it in functionality, albeit at the expense of week-to-week stability 
> and JAR binary compatibility



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23652) Verify error when using ASF s3:// connector. & Jetty 0.9.4

2018-03-14 Thread Steve Loughran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved SPARK-23652.

Resolution: Duplicate

> Verify error when using ASF s3:// connector. & Jetty 0.9.4
> --
>
> Key: SPARK-23652
> URL: https://issues.apache.org/jira/browse/SPARK-23652
> Project: Spark
>  Issue Type: Question
>  Components: Spark Shell, Spark Submit
>Affects Versions: 1.6.0
>Reporter: Abhishek Shrivastava
>Priority: Minor
>
> In below spark-shell I am trying to connect to S3 and load file to create 
> dataframe:
>  
> {{spark-shell --packages com.databricks:spark-csv_2.10:1.5.0 scala> val 
> sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> 
> sc.hadoopConfiguration.set("fs.s3a.access.key", "") scala> 
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "") scala> val weekly = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", 
> "true").option("delimiter", ",").load("s3://usr_bucket/data/file.csv") scala> 
> print(weekly) scala> weekly.show()}}
>  
>  
> {{Error:}}
> {{java.lang.VerifyError: Bad type on operand stack Exception Details: 
> Location: 
> org/apache/hadoop/fs/s3/Jets3tFileSystemStore.initialize(Ljava/net/URI;Lorg/apache/hadoop/conf/Configuration;)V
>  @43: invokespecial Reason: Type 'org/jets3t/service/security/AWSCredentials' 
> (current frame, stack[3]) is not assignable to 
> 'org/jets3t/service/security/ProviderCredentials' Current Frame: bci: @43 
> flags: \{ } locals: \{ 'org/apache/hadoop/fs/s3/Jets3tFileSystemStore', 
> 'java/net/URI', 'org/apache/hadoop/conf/Configuration', 
> 'org/apache/hadoop/fs/s3/S3Credentials', 
> 'org/jets3t/service/security/AWSCredentials' } stack: \{ 
> 'org/apache/hadoop/fs/s3/Jets3tFileSystemStore', uninitialized 37, 
> uninitialized 37, 'org/jets3t/service/security/AWSCredentials' } Bytecode: 
> 000: 2a2c b500 02bb 0003 59b7 0004 4e2d 2b2c 010: b600 05bb 0006 592d 
> b600 072d b600 08b7 020: 0009 3a04 2abb 000a 5919 04b7 000b b500 030: 
> 0ca7 0023 3a04 1904 b600 0ec1 000f 9900 040: 0c19 04b6 000e c000 0fbf 
> bb00 1059 1904 050: b700 11bf 2abb 0012 592b b600 13b7 0014 060: b500 
> 152a 2c12 1611 1000 b600 17b5 0018 070: b1 Exception Handler Table: bci 
> [19, 49] => handler: 52 Stackmap Table: 
> full_frame(@52,\{Object[#194],Object[#195],Object[#196],Object[#197]},\{Object[#198]})
>  append_frame(@74,Object[#198]) chop_frame(@84,1) at 
> org.apache.hadoop.fs.s3.S3FileSystem.createDefaultStore(S3FileSystem.java:119)
>  at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:109) at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816) at 
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98) at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853) at 
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835) at 
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387) at 
> org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at 
> org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
>  at 
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229) 
> at 
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315) 
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at 
> scala.Option.getOrElse(Option.scala:120) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at 
> scala.Option.getOrElse(Option.scala:120) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at 
> scala.Option.getOrElse(Option.scala:120) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at 
> org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307) at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at 
> org.apache.spark.rdd.RDD.take(RDD.scala:1302) at 
> org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342) at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>  at 
> 

[jira] [Created] (SPARK-23679) uiWebUrl show inproper URL when running on YARN

2018-03-14 Thread JIRA
Maciej Bryński created SPARK-23679:
--

 Summary: uiWebUrl show inproper URL when running on YARN
 Key: SPARK-23679
 URL: https://issues.apache.org/jira/browse/SPARK-23679
 Project: Spark
  Issue Type: Bug
  Components: Web UI
Affects Versions: 2.3.0
Reporter: Maciej Bryński


uiWebUrl returns local url.
Using it will cause HTTP ERROR 500
{code}
Problem accessing /. Reason:

Server Error
Caused by:
javax.servlet.ServletException: Could not determine the proxy server for 
redirection
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.findRedirectUrl(AmIpFilter.java:205)
at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:145)
at 
org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
at 
org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
at 
org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at 
org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at 
org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at 
org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at 
org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at 
org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at 
org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at 
org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at 
org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at 
org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at 
org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at 
org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at 
org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at 
org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:745)
{code}

We should give address to yarn proxy instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23678) a more efficient partition strategy

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23678:


Assignee: Apache Spark

> a more efficient partition strategy
> ---
>
> Key: SPARK-23678
> URL: https://issues.apache.org/jira/browse/SPARK-23678
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Affects Versions: 2.4.0
>Reporter: wenbinwei
>Assignee: Apache Spark
>Priority: Minor
>
> Recently, I found a new partition strategy (call EdgePartitionTriangle), 
> which is a combination of the partition strategy EdgePartition2D and the the 
> partition strategy CanonicalRandomVertexCut. This partition strategy has 
> three advantages:
>  1. nicer bound on vertex replication, sqrt(2 * numParts).
>  2. colocate all edges between two vertices regardless of direction.
>  3. same work balance compared with EdgePartition2D
> See 
> [https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb]
> The main idea is to virtually partitioned by EdgePartition2D, gets partitions
> {(i,j)|i=1,2,..,k, j=1,2,..,k}
> . Then relocate partitions by folding the virtual partitions, such as:
> (1,0) and (0,1) -> (1, 0)
> (2,1) and (1,2) -> (2, 1)
> ...
> (k, k-1) and (k-1, k) -> (k, k -1)
>  
>  Finally, maps \{(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to 
> \{0,1,...,k*(k-1) / 2}
> The complete method needs to handle more details:
> 1.  when numParts is not a triangle number, partitions are divided into two 
> types: triangleParts and rests. The later one is partitioned by a different 
> strategy.
> 2. when edges are virtually located to partition (a, a), Then they should be 
> relocated to partition
> {(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}
> to achieve better work balance.
> codes: 
> {code:java}
> object EdgePartitionTriangle extends PartitionStrategy {
>   override def getPartition(src: VertexId, dst: VertexId, numParts: 
> PartitionID): PartitionID = {
> val mixingPrime: VertexId = 1125899906842597L
> val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
> val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
> val segmentFactor = 100 // positive even numbers
> val numSegments = (segmentFactor * math.sqrt(4 * numParts * 
> numTriParts)).toInt
> val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
> val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
> var row = segRow / (segmentFactor * numRowTriParts)
> var col = segCol / (segmentFactor * numRowTriParts)
> if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
>   row = numRowTriParts + 1
>   col = math.min(segRow, segCol) % (numParts - numTriParts)
> }
> else if (row == col) {
>   val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
>   col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % 
> (numRowTriParts + 1)
> }
> if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
>   }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23678) a more efficient partition strategy

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398622#comment-16398622
 ] 

Apache Spark commented on SPARK-23678:
--

User 'weiwee' has created a pull request for this issue:
https://github.com/apache/spark/pull/20821

> a more efficient partition strategy
> ---
>
> Key: SPARK-23678
> URL: https://issues.apache.org/jira/browse/SPARK-23678
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Affects Versions: 2.4.0
>Reporter: wenbinwei
>Priority: Minor
>
> Recently, I found a new partition strategy (call EdgePartitionTriangle), 
> which is a combination of the partition strategy EdgePartition2D and the the 
> partition strategy CanonicalRandomVertexCut. This partition strategy has 
> three advantages:
>  1. nicer bound on vertex replication, sqrt(2 * numParts).
>  2. colocate all edges between two vertices regardless of direction.
>  3. same work balance compared with EdgePartition2D
> See 
> [https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb]
> The main idea is to virtually partitioned by EdgePartition2D, gets partitions
> {(i,j)|i=1,2,..,k, j=1,2,..,k}
> . Then relocate partitions by folding the virtual partitions, such as:
> (1,0) and (0,1) -> (1, 0)
> (2,1) and (1,2) -> (2, 1)
> ...
> (k, k-1) and (k-1, k) -> (k, k -1)
>  
>  Finally, maps \{(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to 
> \{0,1,...,k*(k-1) / 2}
> The complete method needs to handle more details:
> 1.  when numParts is not a triangle number, partitions are divided into two 
> types: triangleParts and rests. The later one is partitioned by a different 
> strategy.
> 2. when edges are virtually located to partition (a, a), Then they should be 
> relocated to partition
> {(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}
> to achieve better work balance.
> codes: 
> {code:java}
> object EdgePartitionTriangle extends PartitionStrategy {
>   override def getPartition(src: VertexId, dst: VertexId, numParts: 
> PartitionID): PartitionID = {
> val mixingPrime: VertexId = 1125899906842597L
> val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
> val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
> val segmentFactor = 100 // positive even numbers
> val numSegments = (segmentFactor * math.sqrt(4 * numParts * 
> numTriParts)).toInt
> val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
> val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
> var row = segRow / (segmentFactor * numRowTriParts)
> var col = segCol / (segmentFactor * numRowTriParts)
> if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
>   row = numRowTriParts + 1
>   col = math.min(segRow, segCol) % (numParts - numTriParts)
> }
> else if (row == col) {
>   val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
>   col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % 
> (numRowTriParts + 1)
> }
> if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
>   }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23678) a more efficient partition strategy

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23678:


Assignee: (was: Apache Spark)

> a more efficient partition strategy
> ---
>
> Key: SPARK-23678
> URL: https://issues.apache.org/jira/browse/SPARK-23678
> Project: Spark
>  Issue Type: New Feature
>  Components: GraphX
>Affects Versions: 2.4.0
>Reporter: wenbinwei
>Priority: Minor
>
> Recently, I found a new partition strategy (call EdgePartitionTriangle), 
> which is a combination of the partition strategy EdgePartition2D and the the 
> partition strategy CanonicalRandomVertexCut. This partition strategy has 
> three advantages:
>  1. nicer bound on vertex replication, sqrt(2 * numParts).
>  2. colocate all edges between two vertices regardless of direction.
>  3. same work balance compared with EdgePartition2D
> See 
> [https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb]
> The main idea is to virtually partitioned by EdgePartition2D, gets partitions
> {(i,j)|i=1,2,..,k, j=1,2,..,k}
> . Then relocate partitions by folding the virtual partitions, such as:
> (1,0) and (0,1) -> (1, 0)
> (2,1) and (1,2) -> (2, 1)
> ...
> (k, k-1) and (k-1, k) -> (k, k -1)
>  
>  Finally, maps \{(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to 
> \{0,1,...,k*(k-1) / 2}
> The complete method needs to handle more details:
> 1.  when numParts is not a triangle number, partitions are divided into two 
> types: triangleParts and rests. The later one is partitioned by a different 
> strategy.
> 2. when edges are virtually located to partition (a, a), Then they should be 
> relocated to partition
> {(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}
> to achieve better work balance.
> codes: 
> {code:java}
> object EdgePartitionTriangle extends PartitionStrategy {
>   override def getPartition(src: VertexId, dst: VertexId, numParts: 
> PartitionID): PartitionID = {
> val mixingPrime: VertexId = 1125899906842597L
> val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
> val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
> val segmentFactor = 100 // positive even numbers
> val numSegments = (segmentFactor * math.sqrt(4 * numParts * 
> numTriParts)).toInt
> val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
> val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
> var row = segRow / (segmentFactor * numRowTriParts)
> var col = segCol / (segmentFactor * numRowTriParts)
> if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
>   row = numRowTriParts + 1
>   col = math.min(segRow, segCol) % (numParts - numTriParts)
> }
> else if (row == col) {
>   val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
>   col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % 
> (numRowTriParts + 1)
> }
> if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
>   }
> {code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23678) a more efficient partition strategy

2018-03-14 Thread wenbinwei (JIRA)
wenbinwei created SPARK-23678:
-

 Summary: a more efficient partition strategy
 Key: SPARK-23678
 URL: https://issues.apache.org/jira/browse/SPARK-23678
 Project: Spark
  Issue Type: New Feature
  Components: GraphX
Affects Versions: 2.4.0
Reporter: wenbinwei


Recently, I found a new partition strategy (call EdgePartitionTriangle), which 
is a combination of the partition strategy EdgePartition2D and the the 
partition strategy CanonicalRandomVertexCut. This partition strategy has three 
advantages:
 1. nicer bound on vertex replication, sqrt(2 * numParts).
 2. colocate all edges between two vertices regardless of direction.
 3. same work balance compared with EdgePartition2D

See 
[https://github.com/weiwee/edgePartitionTri/blob/master/EdgePartitionTriangle.ipynb]

The main idea is to virtually partitioned by EdgePartition2D, gets partitions

{(i,j)|i=1,2,..,k, j=1,2,..,k}

. Then relocate partitions by folding the virtual partitions, such as:

(1,0) and (0,1) -> (1, 0)

(2,1) and (1,2) -> (2, 1)

...

(k, k-1) and (k-1, k) -> (k, k -1)

 
 Finally, maps \{(1,0), (2,0), (2,1), (3,0),(3,1),(3,2),...,(k, k-1)} to 
\{0,1,...,k*(k-1) / 2}

The complete method needs to handle more details:

1.  when numParts is not a triangle number, partitions are divided into two 
types: triangleParts and rests. The later one is partitioned by a different 
strategy.

2. when edges are virtually located to partition (a, a), Then they should be 
relocated to partition

{(a, 0), (a, 1),..., (a, a-1), (a+1, a),...,(k, a)}

to achieve better work balance.

codes: 
{code:java}
object EdgePartitionTriangle extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: 
PartitionID): PartitionID = {
val mixingPrime: VertexId = 1125899906842597L
val numRowTriParts = ((math.sqrt(1 + 8 * numParts) - 1) / 2).toInt
val numTriParts = numRowTriParts * (numRowTriParts + 1) / 2
val segmentFactor = 100 // positive even numbers
val numSegments = (segmentFactor * math.sqrt(4 * numParts * 
numTriParts)).toInt
val segRow = (math.abs(src * mixingPrime) % numSegments).toInt
val segCol = (math.abs(dst * mixingPrime) % numSegments).toInt
var row = segRow / (segmentFactor * numRowTriParts)
var col = segCol / (segmentFactor * numRowTriParts)
if (math.max(segRow, segCol) >= 2 * segmentFactor * numTriParts) {
  row = numRowTriParts + 1
  col = math.min(segRow, segCol) % (numParts - numTriParts)
}
else if (row == col) {
  val ind = math.min(segRow % numRowTriParts, segCol % numRowTriParts)
  col = (math.min(2 * numRowTriParts - ind - 1, ind) + row + 1) % 
(numRowTriParts + 1)
}
if (row > col) row * (row - 1) / 2 + col else col * (col - 1) / 2 + row
  }
{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23677) Selecting columns from joined DataFrames with the same origin yields wrong results

2018-03-14 Thread Martin Mauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Mauch updated SPARK-23677:
-
Affects Version/s: 2.2.1

> Selecting columns from joined DataFrames with the same origin yields wrong 
> results
> --
>
> Key: SPARK-23677
> URL: https://issues.apache.org/jira/browse/SPARK-23677
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.2.1, 2.3.0
>Reporter: Martin Mauch
>Priority: Major
>
> When trying to join two DataFrames with the same origin DataFrame and later 
> selecting columns from the join, Spark can't distinguish between the columns 
> and gives a wrong (or at least very surprising) result. One can work around 
> this using expr.
> Here is a minimal example:
>  
> {code:java}
> import spark.implicits._
> val edf = Seq((1), (2), (3), (4), (5)).toDF("num")
> val big = edf.where(edf("num") > 2).alias("big")
> val small = edf.where(edf("num") < 4).alias("small")
> small.join(big, expr("big.num == (small.num + 1)")).select(small("num"), 
> big("num")).show()
> // +---+---+
> // |num|num|
> // +---+---+
> // | 2| 2|
> // | 3| 3|
> // +—+—+
> small.join(big, expr("big.num == (small.num + 1)")).select(expr("small.num"), 
> expr("big.num")).show()
> // +---+---+
> // |num|num|
> // +---+---+
> // | 2| 3|
> // | 3| 4|
> // +---+---+
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23677) Selecting columns from joined DataFrames with the same origin yields wrong results

2018-03-14 Thread Martin Mauch (JIRA)
Martin Mauch created SPARK-23677:


 Summary: Selecting columns from joined DataFrames with the same 
origin yields wrong results
 Key: SPARK-23677
 URL: https://issues.apache.org/jira/browse/SPARK-23677
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 2.3.0
Reporter: Martin Mauch


When trying to join two DataFrames with the same origin DataFrame and later 
selecting columns from the join, Spark can't distinguish between the columns 
and gives a wrong (or at least very surprising) result. One can work around 
this using expr.

Here is a minimal example:

 
{code:java}
import spark.implicits._
val edf = Seq((1), (2), (3), (4), (5)).toDF("num")
val big = edf.where(edf("num") > 2).alias("big")
val small = edf.where(edf("num") < 4).alias("small")
small.join(big, expr("big.num == (small.num + 1)")).select(small("num"), 
big("num")).show()
// +---+---+
// |num|num|
// +---+---+
// | 2| 2|
// | 3| 3|
// +—+—+
small.join(big, expr("big.num == (small.num + 1)")).select(expr("small.num"), 
expr("big.num")).show()
// +---+---+
// |num|num|
// +---+---+
// | 2| 3|
// | 3| 4|
// +---+---+
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23665) Add adaptive algorithm to select query result collect method

2018-03-14 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398358#comment-16398358
 ] 

Takeshi Yamamuro commented on SPARK-23665:
--

IMHO it's ok to do so in application-sides. Any benefit to support this in the 
thriftserver impl?

> Add adaptive algorithm to select query result collect method
> 
>
> Key: SPARK-23665
> URL: https://issues.apache.org/jira/browse/SPARK-23665
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1, 2.3.0
>Reporter: zhoukang
>Priority: Major
>
> Currently, we use configuration like 
> {code:java}
> spark.sql.thriftServer.incrementalCollect
> {code}
> to specify query result collect method.
> Actually,we can estimate the size of the result and select collect method 
> automatically. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398237#comment-16398237
 ] 

Deepansh commented on SPARK-23650:
--

attaching more of logs.

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepansh updated SPARK-23650:
-
Attachment: sparkR_log2.txt

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398196#comment-16398196
 ] 

Apache Spark commented on SPARK-23676:
--

User 'heary-cao' has created a pull request for this issue:
https://github.com/apache/spark/pull/20820

> Support left join codegen in SortMergeJoinExec
> --
>
> Key: SPARK-23676
> URL: https://issues.apache.org/jira/browse/SPARK-23676
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: caoxuewen
>Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> --
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ 

[jira] [Assigned] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23676:


Assignee: (was: Apache Spark)

> Support left join codegen in SortMergeJoinExec
> --
>
> Key: SPARK-23676
> URL: https://issues.apache.org/jira/browse/SPARK-23676
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: caoxuewen
>Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> --
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ smj_mutableStateArray2[0].setNullAt(3);
> /* 058 */ } else {
> /* 059 */ smj_mutableStateArray2[0].write(3, 

[jira] [Assigned] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23676:


Assignee: Apache Spark

> Support left join codegen in SortMergeJoinExec
> --
>
> Key: SPARK-23676
> URL: https://issues.apache.org/jira/browse/SPARK-23676
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: caoxuewen
>Assignee: Apache Spark
>Priority: Major
>
> This PR generates java code to directly complete the function of LeftOuter in 
> `SortMergeJoinExec` without using an iterator. 
> This PR improves runtime performance by this generates java code.
> joinBenchmark result: **1.3x**
> ```
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
> Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
> left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
> --
> left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
> left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
> ```
> Benchmark program
> ```
>  val N = 2 << 20
>  runBenchmark("left sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2, col("k1") === col("k2"), "left")
>  
> assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
> ```
> code example
> ```
> val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
> val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
> df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), 
> "left").collect
> ```
> Generated code
> ```
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */ return new GeneratedIteratorForCodegenStage5(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ // codegenStageId=5
> /* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 007 */ private Object[] references;
> /* 008 */ private scala.collection.Iterator[] inputs;
> /* 009 */ private scala.collection.Iterator smj_leftInput;
> /* 010 */ private scala.collection.Iterator smj_rightInput;
> /* 011 */ private InternalRow smj_leftRow;
> /* 012 */ private InternalRow smj_rightRow;
> /* 013 */ private long smj_value2;
> /* 014 */ private 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
> /* 015 */ private long smj_value3;
> /* 016 */ private long smj_value4;
> /* 017 */ private long smj_value5;
> /* 018 */ private long smj_value6;
> /* 019 */ private boolean smj_isNull2;
> /* 020 */ private long smj_value7;
> /* 021 */ private boolean smj_isNull3;
> /* 022 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
> smj_mutableStateArray1 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
> /* 023 */ private 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
> smj_mutableStateArray2 = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
> /* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
> /* 025 */
> /* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
> /* 027 */ this.references = references;
> /* 028 */ }
> /* 029 */
> /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
> /* 031 */ partitionIndex = index;
> /* 032 */ this.inputs = inputs;
> /* 033 */ smj_leftInput = inputs[0];
> /* 034 */ smj_rightInput = inputs[1];
> /* 035 */
> /* 036 */ smj_matches = new 
> org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
> 2147483647);
> /* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
> /* 038 */ smj_mutableStateArray1[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
>  0);
> /* 039 */ smj_mutableStateArray2[0] = new 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
>  4);
> /* 040 */
> /* 041 */ }
> /* 042 */
> /* 043 */ private void writeJoinRows() throws java.io.IOException {
> /* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
> /* 045 */
> /* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
> /* 047 */
> /* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
> /* 049 */
> /* 050 */ if (smj_isNull2) {
> /* 051 */ smj_mutableStateArray2[0].setNullAt(2);
> /* 052 */ } else {
> /* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
> /* 054 */ }
> /* 055 */
> /* 056 */ if (smj_isNull3) {
> /* 057 */ smj_mutableStateArray2[0].setNullAt(3);
> /* 058 */ } else {
> /* 059 */ 

[jira] [Created] (SPARK-23676) Support left join codegen in SortMergeJoinExec

2018-03-14 Thread caoxuewen (JIRA)
caoxuewen created SPARK-23676:
-

 Summary: Support left join codegen in SortMergeJoinExec
 Key: SPARK-23676
 URL: https://issues.apache.org/jira/browse/SPARK-23676
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: caoxuewen


This PR generates java code to directly complete the function of LeftOuter in 
`SortMergeJoinExec` without using an iterator. 
This PR improves runtime performance by this generates java code.

joinBenchmark result: **1.3x**
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
left sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--
left merge join wholestage=off 2439 / 2575 0.9 1163.0 1.0X
left merge join wholestage=on 1890 / 1904 1.1 901.1 1.3X
```
Benchmark program
```
 val N = 2 << 20
 runBenchmark("left sort merge join", N) {
 val df1 = sparkSession.range(N)
 .selectExpr(s"(id * 15485863) % ${N*10} as k1")
 val df2 = sparkSession.range(N)
 .selectExpr(s"(id * 15485867) % ${N*10} as k2")
 val df = df1.join(df2, col("k1") === col("k2"), "left")
 
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
 df.count()
```
code example
```
val df1 = spark.range(2 << 20).selectExpr("id as k1", "id * 2 as v1")
val df2 = spark.range(2 << 20).selectExpr("id as k2", "id * 3 as v2")
df1.join(df2, col("k1") === col("k2") && col("v1") < col("v2"), "left").collect
```
Generated code
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage5(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=5
/* 006 */ final class GeneratedIteratorForCodegenStage5 extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator smj_leftInput;
/* 010 */ private scala.collection.Iterator smj_rightInput;
/* 011 */ private InternalRow smj_leftRow;
/* 012 */ private InternalRow smj_rightRow;
/* 013 */ private long smj_value2;
/* 014 */ private 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray smj_matches;
/* 015 */ private long smj_value3;
/* 016 */ private long smj_value4;
/* 017 */ private long smj_value5;
/* 018 */ private long smj_value6;
/* 019 */ private boolean smj_isNull2;
/* 020 */ private long smj_value7;
/* 021 */ private boolean smj_isNull3;
/* 022 */ private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] 
smj_mutableStateArray1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
/* 023 */ private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] 
smj_mutableStateArray2 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 024 */ private UnsafeRow[] smj_mutableStateArray = new UnsafeRow[1];
/* 025 */
/* 026 */ public GeneratedIteratorForCodegenStage5(Object[] references) {
/* 027 */ this.references = references;
/* 028 */ }
/* 029 */
/* 030 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */ smj_leftInput = inputs[0];
/* 034 */ smj_rightInput = inputs[1];
/* 035 */
/* 036 */ smj_matches = new 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray(2147483647, 
2147483647);
/* 037 */ smj_mutableStateArray[0] = new UnsafeRow(4);
/* 038 */ smj_mutableStateArray1[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(smj_mutableStateArray[0],
 0);
/* 039 */ smj_mutableStateArray2[0] = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(smj_mutableStateArray1[0],
 4);
/* 040 */
/* 041 */ }
/* 042 */
/* 043 */ private void writeJoinRows() throws java.io.IOException {
/* 044 */ smj_mutableStateArray2[0].zeroOutNullBytes();
/* 045 */
/* 046 */ smj_mutableStateArray2[0].write(0, smj_value4);
/* 047 */
/* 048 */ smj_mutableStateArray2[0].write(1, smj_value5);
/* 049 */
/* 050 */ if (smj_isNull2) {
/* 051 */ smj_mutableStateArray2[0].setNullAt(2);
/* 052 */ } else {
/* 053 */ smj_mutableStateArray2[0].write(2, smj_value6);
/* 054 */ }
/* 055 */
/* 056 */ if (smj_isNull3) {
/* 057 */ smj_mutableStateArray2[0].setNullAt(3);
/* 058 */ } else {
/* 059 */ smj_mutableStateArray2[0].write(3, smj_value7);
/* 060 */ }
/* 061 */ append(smj_mutableStateArray[0].copy());
/* 062 */
/* 063 */ }
/* 064 */
/* 065 */ private boolean findNextJoinRows(
/* 066 */ scala.collection.Iterator leftIter,
/* 067 */ scala.collection.Iterator rightIter) {
/* 068 */ smj_leftRow = null;
/* 069 */ int comp = 0;
/* 070 */ while (smj_leftRow == null) {
/* 071 */ if (!leftIter.hasNext()) return false;
/* 072 */ smj_leftRow = (InternalRow) leftIter.next();
/* 073 */
/* 

[jira] [Commented] (SPARK-23618) docker-image-tool.sh Fails While Building Image

2018-03-14 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398175#comment-16398175
 ] 

Felix Cheung commented on SPARK-23618:
--

I think this is because the user isn't in the user role list.

you can add him to Contributors here 
https://issues.apache.org/jira/plugins/servlet/project-config/SPARK/roles

> docker-image-tool.sh Fails While Building Image
> ---
>
> Key: SPARK-23618
> URL: https://issues.apache.org/jira/browse/SPARK-23618
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Ninad Ingole
>Priority: Major
>
> I am trying to build kubernetes image for version 2.3.0, using 
> {code:java}
> ./bin/docker-image-tool.sh -r ninadingole/spark-docker -t v2.3.0 build
> {code}
> giving me an issue for docker build 
> error:
> {code:java}
> "docker build" requires exactly 1 argument.
> See 'docker build --help'.
> Usage: docker build [OPTIONS] PATH | URL | - [flags]
> Build an image from a Dockerfile
> {code}
>  
> Executing the command within the spark distribution directory. Please let me 
> know what's the issue.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23675:


Assignee: (was: Apache Spark)

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png, yarn.png
>
>
> Title add spark logo, use spark logo image. reference other big data system 
> ui, so i think spark should add it.
> spark fix before: !spark_fix_before.png!
>  
> spark fix after: !spark_fix_after.png!
>  
> reference kafka ui: !kafka.png!
>  
> reference storm ui: !storm.png!
>  
> reference yarn ui: !yarn.png!
>  
> reference nifi ui: !nifi.png!
>  
> reference flink ui: !flink.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398172#comment-16398172
 ] 

Apache Spark commented on SPARK-23675:
--

User 'guoxiaolongzte' has created a pull request for this issue:
https://github.com/apache/spark/pull/20818

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png, yarn.png
>
>
> Title add spark logo, use spark logo image. reference other big data system 
> ui, so i think spark should add it.
> spark fix before: !spark_fix_before.png!
>  
> spark fix after: !spark_fix_after.png!
>  
> reference kafka ui: !kafka.png!
>  
> reference storm ui: !storm.png!
>  
> reference yarn ui: !yarn.png!
>  
> reference nifi ui: !nifi.png!
>  
> reference flink ui: !flink.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398174#comment-16398174
 ] 

Felix Cheung commented on SPARK-23650:
--

I see one RRunner - do you have more of the log?

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-23675:


Assignee: Apache Spark

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Assignee: Apache Spark
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png, yarn.png
>
>
> Title add spark logo, use spark logo image. reference other big data system 
> ui, so i think spark should add it.
> spark fix before: !spark_fix_before.png!
>  
> spark fix after: !spark_fix_after.png!
>  
> reference kafka ui: !kafka.png!
>  
> reference storm ui: !storm.png!
>  
> reference yarn ui: !yarn.png!
>  
> reference nifi ui: !nifi.png!
>  
> reference flink ui: !flink.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread guoxiaolongzte (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guoxiaolongzte updated SPARK-23675:
---
Description: 
Title add spark logo, use spark logo image. reference other big data system ui, 
so i think spark should add it.

spark fix before: !spark_fix_before.png!

 

spark fix after: !spark_fix_after.png!

 

reference kafka ui: !kafka.png!

 

reference storm ui: !storm.png!

 

reference yarn ui: !yarn.png!

 

reference nifi ui: !nifi.png!

 

reference flink ui: !flink.png!

 

 

  was:Title add spark logo, use spark logo image


> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png, yarn.png
>
>
> Title add spark logo, use spark logo image. reference other big data system 
> ui, so i think spark should add it.
> spark fix before: !spark_fix_before.png!
>  
> spark fix after: !spark_fix_after.png!
>  
> reference kafka ui: !kafka.png!
>  
> reference storm ui: !storm.png!
>  
> reference yarn ui: !yarn.png!
>  
> reference nifi ui: !nifi.png!
>  
> reference flink ui: !flink.png!
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread guoxiaolongzte (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guoxiaolongzte updated SPARK-23675:
---
Attachment: yarn.png

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png, yarn.png
>
>
> Title add spark logo, use spark logo image



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread guoxiaolongzte (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guoxiaolongzte updated SPARK-23675:
---
Attachment: storm.png

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, storm.png, yarn.png
>
>
> Title add spark logo, use spark logo image



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread guoxiaolongzte (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guoxiaolongzte updated SPARK-23675:
---
Attachment: spark_fix_after.png
flink.png
nifi.png
yarn.png
storm.png
kafka.png
spark_fix_before.png

> Title add spark logo, use spark logo image
> --
>
> Key: SPARK-23675
> URL: https://issues.apache.org/jira/browse/SPARK-23675
> Project: Spark
>  Issue Type: Improvement
>  Components: Web UI
>Affects Versions: 2.4.0
>Reporter: guoxiaolongzte
>Priority: Minor
> Attachments: flink.png, kafka.png, nifi.png, spark_fix_after.png, 
> spark_fix_before.png, storm.png, yarn.png
>
>
> Title add spark logo, use spark logo image



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23675) Title add spark logo, use spark logo image

2018-03-14 Thread guoxiaolongzte (JIRA)
guoxiaolongzte created SPARK-23675:
--

 Summary: Title add spark logo, use spark logo image
 Key: SPARK-23675
 URL: https://issues.apache.org/jira/browse/SPARK-23675
 Project: Spark
  Issue Type: Improvement
  Components: Web UI
Affects Versions: 2.4.0
Reporter: guoxiaolongzte


Title add spark logo, use spark logo image



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepansh updated SPARK-23650:
-
Attachment: sparkRlag.txt

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deepansh updated SPARK-23650:
-
Attachment: (was: sparkRlag.txt)

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org