None.get on Redact in DataSourceScanExec

2017-07-13 Thread Russell Spitzer
Sorry if this is a double post, wasn't sure if I got through on my
forwarding.

I mentioned this in the RC2 note for 2.2.0 of Spark and i'm seeing it now
on the official release. Running the Spark Casasnadra Connector integration
tests for the SCC now fail whenever trying to do something involving the
CassandraSource being transformed into the DataSourceScanExec SparkPlan.

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L70

Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
This leads to an None.get (full exception below)

This only seems to reproduce when I run from within sbt, running through
the IntelliJ scalaTest runner works fine on the same code. This makes me
think that perhaps something about how sbt is loading the

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

class is somehow avoiding having it have access to the getActiveSession

I'm wondering if anyone else has run into this and found a workaround, I
saw a similar report posted to the end of this ticket

https://issues.apache.org/jira/browse/SPARK-16599?focusedCommentId=16038185=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16038185

I have tried setting the ActiveSession but it doesn't seem relevant to the
thread which ends up calling getActiveSession


Thanks for your time,
Russ

The failure is

java.util.NoSuchElementException: None.get
[info]  at scala.None$.get(Option.scala:347)
[info]  at scala.None$.get(Option.scala:345)
[info]  at org.apache.spark.sql.execution.DataSourceScanExec$class.org
$apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
[info]  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]  at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]  at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info]  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info]  at
org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:52)
[info]  at
org.apache.spark.sql.execution.RowDataSourceScanExec.simpleString(DataSourceScanExec.scala:75)
[info]  at
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:349)
...

I can post the full exception with the attempt to serialize and scalaTest
wrapping if anyone wants to see that but it is quite long.


Fwd: None.get on Redact in DataSourceScanExec

2017-07-13 Thread Russell Spitzer
I mentioned this in the RC2 note for 2.2.0 of Spark and i'm seeing it now
on the official release. Running the Spark Casasnadra Connector integration
tests for the SCC now fail whenever trying to do something involving the
CassandraSource being transformed into the DataSourceScanExec SparkPlan.

https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L70

Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
This leads to an None.get (full exception below)

This only seems to reproduce when I run from within sbt, running through
the IntelliJ scalaTest runner works fine on the same code. This makes me
think that perhaps something about how sbt is loading the

org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

class is somehow avoiding having it have access to the getActiveSession

I'm wondering if anyone else has run into this and found a workaround, I
saw a similar report posted to the end of this ticket

https://issues.apache.org/jira/browse/SPARK-16599?focusedCommentId=16038185=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16038185

I have tried setting the ActiveSession but it doesn't seem relevant to the
thread which ends up calling getActiveSession


Thanks for your time,
Russ

The failure is

java.util.NoSuchElementException: None.get
[info] at scala.None$.get(Option.scala:347)
[info] at scala.None$.get(Option.scala:345)
[info] at org.apache.spark.sql.execution.DataSourceScanExec$class.org
$apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
[info] at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info] at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info] at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info] at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info] at
org.apache.spark.sql.execution.DataSourceScanExec$class.simpleString(DataSourceScanExec.scala:52)
[info] at
org.apache.spark.sql.execution.RowDataSourceScanExec.simpleString(DataSourceScanExec.scala:75)
[info] at
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:349)
...

I can post the full exception with the attempt to serialize and scalaTest
wrapping if anyone wants to see that but it is quite long.


Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread swetha kasireddy
OK. Thanks TD. Does stateSnapshots()   bring the snapshot of the state of
all the keys managed by mapWithState or does  it just bring the state of
the keys in the current micro batch? Its kind of conflicting because the
following link says that it brings the state  only for the keys seen in the
current batch.
But, the code documentation says that it brings the state for all the keys.
Also stateSnapshots()  does it give the expired key state for the last time
if we set it?

wordStream.mapWithState(stateSpec).stateSnapshots()

https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

On Thu, Jul 13, 2017 at 3:00 PM, Tathagata Das 
wrote:

> Yes. It does.
>
> On that note, Spark 2.2 (released couple of days ago) adds
> mapGroupsWithState in Structured Streaming.  That is like mapWithState on
> steroids. Just saying. :)
>
> On Thu, Jul 13, 2017 at 1:01 PM, SRK  wrote:
>
>> Hi,
>>
>> Do we need to specify checkpointing for mapWithState just like we do for
>> updateStateByKey?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-
>> to-be-specified-in-Spark-Streaming-tp28858.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread Tathagata Das
Yes. It does.

On that note, Spark 2.2 (released couple of days ago) adds
mapGroupsWithState in Structured Streaming.  That is like mapWithState on
steroids. Just saying. :)

On Thu, Jul 13, 2017 at 1:01 PM, SRK  wrote:

> Hi,
>
> Do we need to specify checkpointing for mapWithState just like we do for
> updateStateByKey?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Does-mapWithState-need-
> checkpointing-to-be-specified-in-Spark-Streaming-tp28858.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-13 Thread SRK
Hi,

Do we need to specify checkpointing for mapWithState just like we do for
updateStateByKey?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-to-be-specified-in-Spark-Streaming-tp28858.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrameReader read from S3 org.apache.spark.sql.AnalysisException: Path does not exist

2017-07-13 Thread Sumona Routh
Yes, which is what I eventually did. I wanted to check if there was some
"mode" type, similar to SaveMode with writers. Appears that there genuinely
is no option for this and it has to be handled by the client using the
exception flow.

Thanks,
Sumona

On Wed, Jul 12, 2017 at 4:59 PM Yong Zhang  wrote:

> Can't you just catch that exception and return an empty dataframe?
>
>
> Yong
>
>
> --
> *From:* Sumona Routh 
> *Sent:* Wednesday, July 12, 2017 4:36 PM
> *To:* user
> *Subject:* DataFrameReader read from S3
> org.apache.spark.sql.AnalysisException: Path does not exist
>
> Hi there,
> I'm trying to read a list of paths from S3 into a dataframe for a window
> of time using the following:
>
> sparkSession.read.parquet(listOfPaths:_*)
>
> In some cases, the path may not be there because there is no data, which
> is an acceptable scenario.
> However, Spark throws an AnalysisException: Path does not exist. Is there
> an option I can set to tell it to gracefully return an empty dataframe if a
> particular path is missing? Looking at the spark code, there is an option
> checkFilesExist, but I don't believe that is set in the particular flow of
> code that I'm accessing.
>
> Thanks!
> Sumona
>
>


Re: underlying checkpoint

2017-07-13 Thread Bernard Jesop
Thank you, one of my mistakes was to think that show() was an action.

2017-07-13 17:52 GMT+02:00 Vadim Semenov :

> You need to trigger an action on that rdd to checkpoint it.
>
> ```
> scala>spark.sparkContext.setCheckpointDir(".")
>
> scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
> 30), ("R", 15), ("Java", 20)))
> df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
>
> scala> df.rdd.checkpoint()
>
> scala> df.rdd.isCheckpointed
> res2: Boolean = false
>
> scala> df.show()
> +--+---+
> |_1| _2|
> +--+---+
> | Scala| 35|
> |Python| 30|
> | R| 15|
> |  Java| 20|
> +--+---+
>
>
> scala> df.rdd.isCheckpointed
> res4: Boolean = false
>
> scala> df.rdd.count()
> res5: Long = 4
>
> scala> df.rdd.isCheckpointed
> res6: Boolean = true
> ```
>
> On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
> wrote:
>
>> Hi everyone, I just tried this simple program :
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> * import
>> org.apache.spark.sql.SparkSession
>>  object CheckpointTest extends App
>> {
>>val spark =
>> SparkSession
>>
>> .builder()
>>
>> .appName("Toto")
>>
>> .getOrCreate()
>>
>> spark.sparkContext.setCheckpointDir(".")
>>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
>> 15), ("Java",
>> 20)))
>>
>> df.show()
>>
>> df.rdd.checkpoint()
>>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
>> checkpointed")
>>  }*
>> But the result is still *"not checkpointed"*.
>> Do you have any idea why? (knowing that the checkpoint file is created)
>>
>> Best regards,
>> Bernard JESOP
>>
>
>


Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
30), ("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
wrote:

> Hi everyone, I just tried this simple program :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * import
> org.apache.spark.sql.SparkSession
>  object CheckpointTest extends App
> {
>val spark =
> SparkSession
>
> .builder()
>
> .appName("Toto")
>
> .getOrCreate()
>
> spark.sparkContext.setCheckpointDir(".")
>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
> 15), ("Java",
> 20)))
>
> df.show()
>
> df.rdd.checkpoint()
>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
> checkpointed")
>  }*
> But the result is still *"not checkpointed"*.
> Do you have any idea why? (knowing that the checkpoint file is created)
>
> Best regards,
> Bernard JESOP
>


underlying checkpoint

2017-07-13 Thread Bernard Jesop
Hi everyone, I just tried this simple program :


















* import
org.apache.spark.sql.SparkSession
 object CheckpointTest extends App
{
   val spark =
SparkSession

.builder()

.appName("Toto")

.getOrCreate()

spark.sparkContext.setCheckpointDir(".")
   val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
15), ("Java",
20)))

df.show()

df.rdd.checkpoint()
   println(if (df.rdd.isCheckpointed) "checkpointed" else "not
checkpointed")
 }*
But the result is still *"not checkpointed"*.
Do you have any idea why? (knowing that the checkpoint file is created)

Best regards,
Bernard JESOP


Re: Spark 2.1.1: A bug in org.apache.spark.ml.linalg.* when using VectorAssembler.scala

2017-07-13 Thread Yan Facai
Hi, junjie.

As Nick said,
spark.ml indeed contains Vector, Vectors and VectorUDT by itself, see:
mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala:36:
sealed trait Vector extends Serializable

So, which bug do you find with VectorAssembler? Could you give more details?









On Thu, Jul 13, 2017 at 5:15 PM,  wrote:

> Dear Developers:
>
> Here is a bug in org.apache.spark.ml.linalg.*:
> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
> but they are used in VectorAssembler.scala as follows:
>
> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>
> Therefore, bug was reported when I was using VectorAssembler.
>
> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
> XVectorAssembler.scala by mainly changing "*import *org.apache.spark.*ml*
> .linalg.{Vector, Vectors, VectorUDT}" to
> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>
> But bug occured as follows:
>
> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>
> Would you please help fix the bug?
>
> Thank you very much!
>
> Best regards
> --xiongjunjie



On Thu, Jul 13, 2017 at 6:08 PM, Nick Pentreath 
wrote:

> There are Vector classes under ml.linalg package - And VectorAssembler and
> other feature transformers all work with ml.linalg vectors.
>
> If you try to use mllib.linalg vectors instead you will get an error as
> the user defined type for SQL is not correct
>
>
> On Thu, 13 Jul 2017 at 11:23,  wrote:
>
>> Dear Developers:
>>
>> Here is a bug in org.apache.spark.ml.linalg.*:
>> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
>> but they are used in VectorAssembler.scala as follows:
>>
>> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>>
>> Therefore, bug was reported when I was using VectorAssembler.
>>
>> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
>> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
>> XVectorAssembler.scala by mainly changing "*import *org.apache.spark.*ml*
>> .linalg.{Vector, Vectors, VectorUDT}" to
>> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>>
>> But bug occured as follows:
>>
>> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
>> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>>
>> Would you please help fix the bug?
>>
>> Thank you very much!
>>
>> Best regards
>> --xiongjunjie
>
>


[SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-13 Thread ????
Hi All,


I'm trying to execute hive sql on spark sql (Also on spark thriftserver), For 
optimizing data skew, we use "case when" to handle null. 
Simple sql as following:





SELECT a.col1 
FROM tbl1 a 
LEFT OUTER JOIN tbl2 b 
ON 
CASE 
WHEN a.col2 IS NULL 
TNEN cast(rand(9)*1000 - 99 as string) 
ELSE 
a.col2 END 
= b.col3;



But I get the error:



== Physical Plan ==
org.apache.spark.sql.AnalysisException: nondeterministic expressions are only 
allowed in
Project, Filter, Aggregate or Window, found:
 (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000 AS 
DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt` END = 
c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND (c.`cur_flag` = 1))
in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN 
cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as string) 
ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as int) = 9)) && 
(cur_flag#77 = 1))
   ;;
GlobalLimit 10
+- LocalLimit 10
   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN 
(cast(19596 as string),cast(20134 as string),cast(10997 as string)) && 
nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END], [date_id#7]
  +- Filter (date_id#7 = 2017-07-12)
 +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN 
cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as string) 
ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as int) = 9)) && 
(cur_flag#77 = 1))
:- SubqueryAlias a
:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7, chanl_id#8L, 
pltfm_id#9, city_id#10, sessn_id#11, gu_id#12, nav_refer_page_type_id#13, 
nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16, 
nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19, nav_page_value#20, 
nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25, nav_tcd#26, 
nav_tci#27, nav_tce#28, detl_refer_page_type_id#29, detl_refer_page_value#30, 
... 33 more fields]
+- SubqueryAlias c
   +- SubqueryAlias dim_site_categ_ext
  +- CatalogRelation `dw`.`dim_site_categ_ext`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [site_categ_skid#64L, 
site_categ_type#65, site_categ_code#66, site_categ_name#67, 
site_categ_parnt_skid#68L, site_categ_kywrd#69, leaf_flg#70L, sort_seq#71L, 
site_categ_srch_name#72, vsbl_flg#73, delet_flag#74, etl_batch_id#75L, 
updt_time#76, cur_flag#77, bkgrnd_categ_skid#78L, bkgrnd_categ_id#79L, 
site_categ_id#80, site_categ_parnt_id#81]




Does spark sql not support syntax "case when" in JOIN?  Additional, my spark 
version is 2.2.0. 
Any help would be greatly appreciated.

Re: Spark 2.1.1: A bug in org.apache.spark.ml.linalg.* when using VectorAssembler.scala

2017-07-13 Thread Nick Pentreath
There are Vector classes under ml.linalg package - And VectorAssembler and
other feature transformers all work with ml.linalg vectors.

If you try to use mllib.linalg vectors instead you will get an error as the
user defined type for SQL is not correct


On Thu, 13 Jul 2017 at 11:23,  wrote:

> Dear Developers:
>
> Here is a bug in org.apache.spark.ml.linalg.*:
> Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*,
> but they are used in VectorAssembler.scala as follows:
>
> *import *org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
>
> Therefore, bug was reported when I was using VectorAssembler.
>
> Since org.apache.spark.mllib.linalg.* contains the class {Vector,
> Vectors, VectorUDT}, I rewrote VectorAssembler.scala as
> XVectorAssembler.scala by mainly changing "*import 
> *org.apache.spark.*ml*.linalg.{Vector,
> Vectors, VectorUDT}" to
> "*import *org.apache.spark.*mllib*.linalg.{Vector, Vectors, VectorUDT}"
>
> But bug occured as follows:
>
> " Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "
>
> Would you please help fix the bug?
>
> Thank you very much!
>
> Best regards
> --xiongjunjie


Spark 2.1.1: A bug in org.apache.spark.ml.linalg.* when using VectorAssembler.scala

2017-07-13 Thread xiongjunjie
Dear Developers:

Here is a bug in org.apache.spark.ml.linalg.*:
Class Vector, Vectors are not included in org.apache.spark.ml.linalg.*, 
but they are used in VectorAssembler.scala as follows:

import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}

Therefore, bug was reported when I was using VectorAssembler.

Since org.apache.spark.mllib.linalg.* contains the class {Vector, Vectors, 
VectorUDT}, I rewrote VectorAssembler.scala as XVectorAssembler.scala by 
mainly changing "import org.apache.spark.ml.linalg.{Vector, Vectors, 
VectorUDT}" to 
"import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}"

But bug occured as follows:

" Column v must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 
but was actually org.apache.spark.mllib.linalg.VectorUDT@f71b0bce "

Would you please help fix the bug?

Thank you very much!

Best regards
--xiongjunjie

how to identify the alive master spark via Zookeeper ?

2017-07-13 Thread marina.brunel
Hello,




In our project, we have a Spark cluster with 2 master and 4 workers and 
Zookeeper decides which master is alive.
We have a problem with our reverse proxy to display the Spark Web UI. The RP 
redirect on a master with IP address configured in initial configuration but if 
Zookeeper decides to change the master, our spark Web UI is not accessible 
because the IP address of master changed.
We want to find dynamically which master is elected every time.
We search in Internet a solution to know with Zookeeper which master is alive 
but we don't find anything. It is possible with confd to search if property 
changed but none property is saved in Zookeeper. In folder /spark in Zookeeper, 
nothing is logged.
Why Spark does not send property to Zookeeper to indicate which ip address or 
hostname is elected ? In your class ZooKeeperLeaderElectionAgent.scala, you 
logged which master is elected but perhaps it will be also a good solution to 
send a property to Zookeeper to indicate host.

We already asked to Zookeeper user mailing list and they said that:
"This question may be better suited for the Spark mailing lists as Zookeeper 
doesn't really "decide" which master is alive but rather provides a mechanism 
for the application to make the correct decision."

So, we think that we are not alone with this type of problem but we can't find 
anything on Internet.

Can you help us to solve this problem ?
Regards,
[http://www.orange.com/sirius/logos_mail/orange_logo.gif]
Marina Brunel
Ingénieur d'étude confirmé
Orange Applications for Business
ORANGE/OBS/SCE/OAB SUBS/MTM/SM2M

Mobile : +33 7 84 14 52 05 

marina.bru...@orange.com

3, allée de Beaulieu 35700 Rennes
www.orange.com


_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.



UnpicklingError while using spark streaming

2017-07-13 Thread lovemoon


|

down votefavorite
|

spark2.1.1 & python2.7.11

I want to union another rdd in Dstream.transform() like below:

sc = SparkContext()
ssc = StreamingContext(sc, 1)

init_rdd = sc.textFile('file:///home/zht/PycharmProjects/test/text_file.txt')

lines = ssc.socketTextStream('localhost', )

lines = lines.transform(lambda rdd: rdd.union(init_rdd))

lines.pprint()

ssc.start()
ssc.awaitTermination()


And get error about pickle:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, 
in main
process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, 
in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
268, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1338, in 
takeUpToNumLeft
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
144, in load_stream
yield self._read_with_length(stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
169, in _read_with_length
return self.loads(obj)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
454, in loads
return pickle.loads(obj)
UnpicklingError: unpickling stack underflow


text_file.txt only include one letter :

 a
|