Re: Handling nulls in vector columns is non-trivial

2017-06-24 Thread Timur Shenkao
Hi Franklyn,

I had the same problem like yours with vectors & Maps. I tried:
1) UDF --> cumbersome and difficult to maintain. One has to re-write /
re-implement UDFs + extensive docs should be provided for colleagues +
something weird may happen when you migrate to new Spark version
2) RDD / DataFrame row-by-row approach --> slow but reliable &
understandable and you can fill-in & modify rows as you like
3) Spark SQL / DataFrame "magic tricks" approach --> it's like the code you
provided. Create DataFrames, join them, drop & modify somehow. Generally
speaking, it should be faster then approach #2
4) Encoders --> raw approach; basic types only. I chose this approach
finally. But I had to reformulate my problem in terms of basic data types
and POJO classes
5) Catalyst API Expressions --> I believe this is the right way but it
requires that you & your colleagues dive deep into Spark internals

Sincerely yours, Timur

On Fri, Jun 23, 2017 at 12:55 PM, Franklyn D'souza <
franklyn.dso...@shopify.com> wrote:

> As a reference this is what is required to coalesce a vector column in
> pyspark.
>
> df = sc.sql.createDataFrame([(SparseVector(10,{1:44}),), (None,),
> (SparseVector(10,{1:23}),), (None,), (SparseVector(10,{1:35}),)],
> schema=schema
> empty_vector = sc.sql.createDataFrame([(SparseVector(10, {}),)],
> schema=schema)
> df = df.crossJoin(empty_vector)
> df = df.withColumn('feature', F.coalesce('feature', '_empty_vector')
>
>
>
> On Thu, Jun 22, 2017 at 11:54 AM, Franklyn D'souza <
> franklyn.dso...@shopify.com> wrote:
>
>> We've developed Scala UDFs internally to address some of these issues and
>> we'd love to upstream them back to spark. Just trying to figure out what
>> the vector support looks like on the road map.
>>
>> would it be best to put this functionality into the Imputer,
>> VectorAssembler or maybe try to give it more of a first class support in
>> dataframes by having it work with the lit column expression.
>>
>> On Wed, Jun 21, 2017 at 9:30 PM, Franklyn D'souza <
>> franklyn.dso...@shopify.com> wrote:
>>
>>> From the documentation it states that ` The input columns should be of
>>> DoubleType or FloatType.` so i dont think that is what im looking for.
>>> Also in general the API around vectors is highly lacking, especially from
>>> the pyspark side.
>>>
>>> Very common vector operations like addition, subtractions and dot
>>> products can't be performed. I'm wondering what the direction is with
>>> vector support in spark.
>>>
>>> On Wed, Jun 21, 2017 at 9:19 PM, Maciej Szymkiewicz <
>>> mszymkiew...@gmail.com> wrote:
>>>
 Since 2.2 there is Imputer:

 https://github.com/apache/spark/blob/branch-2.2/examples/src
 /main/python/ml/imputer_example.py

 which should at least partially address the problem.

 On 06/22/2017 03:03 AM, Franklyn D'souza wrote:
 > I just wanted to highlight some of the rough edges around using
 > vectors in columns in dataframes.
 >
 > If there is a null in a dataframe column containing vectors pyspark ml
 > models like logistic regression will completely fail.
 >
 > However from what i've read there is no good way to fill in these
 > nulls with empty vectors.
 >
 > Its not possible to create a literal vector column expressiong and
 > coalesce it with the column from pyspark.
 >
 > so we're left with writing a python udf which does this coalesce, this
 > is really inefficient on large datasets and becomes a bottleneck for
 > ml pipelines working with real world data.
 >
 > I'd like to know how other users are dealing with this and what plans
 > there are to extend vector support for dataframes.
 >
 > Thanks!,
 >
 > Franklyn

 --


 -
 To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


>>>
>>
>


Container exited with a non-zero exit code 1

2017-06-24 Thread Link Qian

any suggestion from spark dev group?


From: Link Qian 
Sent: Friday, June 23, 2017 9:58 AM
To: u...@spark.apache.org
Subject: Container exited with a non-zero exit code 1


Hello,


I submit a spark job to YARN cluster with spark-submit command. the environment 
is CDH 5.4 with spark 1.3.0. which has 6 compute nodes which 64G memory per 
node. The YARN sets 16G max of memory for every container. The job requests 6 
of 8G memory of executors, and  8G of driver. However, I alway get the errors 
after try submit the job several times.  Any help?


--- here are the error logs of Application Master for the job 
--


17/06/22 15:18:44 INFO yarn.YarnAllocator: Completed container 
container_1498115278902_0001_02_13 (state: COMPLETE, exit status: 1)
17/06/22 15:18:44 INFO yarn.YarnAllocator: Container marked as failed: 
container_1498115278902_0001_02_13. Exit status: 1. Diagnostics: Exception 
from container-launch.
Container id: container_1498115278902_0001_02_13
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1



  Here is the yarn application logs of the job.

LogLength:2611
Log Contents:
17/06/22 15:18:09 INFO executor.CoarseGrainedExecutorBackend: Registered signal 
handlers for [TERM, HUP, INT]
17/06/22 15:18:10 INFO spark.SecurityManager: Changing view acls to: yarn,root
17/06/22 15:18:10 INFO spark.SecurityManager: Changing modify acls to: yarn,root
17/06/22 15:18:10 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(yarn, root); users 
with modify permissions: Set(yarn, root)
17/06/22 15:18:10 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/06/22 15:18:10 INFO Remoting: Starting remoting
17/06/22 15:18:10 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverPropsFetcher@dn006:45701]
17/06/22 15:18:10 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://driverPropsFetcher@dn006:45701]
17/06/22 15:18:10 INFO util.Utils: Successfully started service 
'driverPropsFetcher' on port 45701.
17/06/22 15:18:40 WARN security.UserGroupInformation: 
PriviledgedActionException as:root (auth:SIMPLE) 
cause:java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1684)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:139)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:235)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 
seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:155)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
... 4 more


 a snippet of RM log for the job -

2017-06-22 15:18:41,586 INFO 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Released 
container 

issue about the windows slice of stream

2017-06-24 Thread ??????????
Hi all,
I found an issue about the windows slice of dstream.
My code is :


ssc = new StreamingContext( conf, Seconds(1))


val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println())
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println()) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?


thanks
Fei Shao