Re: How to limit search range without using subquery when query SQL DB via JDBC?

2016-05-13 Thread Mich Talebzadeh
Well I don't know about postgres but you can limit the number of columns
abd rows fetched via JDBC at source rather than loading and filtering them
in Spark

val c = HiveContext.load("jdbc",
Map("url" -> _ORACLEserver,
"dbtable" -> "(SELECT to_char(CHANNEL_ID) AS CHANNEL_ID, CHANNEL_DESC FROM
sh.channels where ROWNUM <= 1)",
"user" -> _username,
"password" -> _password))

or in your case

"dbtable" -> "(SELECT COUNT(1) FROM FROM sh.channels where ROWNUM <=
1)",

c.show()

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 14 May 2016 at 04:56, Jyun-Fan Tsai  wrote:

> I try to load some rows from a big SQL table.  Here is my code:
>
> ===
> jdbcDF = sqlContext.read.format("jdbc").options(
>   url="jdbc:postgresql://...",
>   dbtable="mytable",
>   partitionColumn="t",
>   lowerBound=1451577600,
>   upperBound=1454256000,
>   numPartitions=1).load()
> print(jdbcDF.count())
> ===
>
> The code runs very slow because Spark tries to load whole table.
> I know there is a solution that uses subquery.  I can use:
>
> dbtable="(SELECT * FROM mytable WHERE t>=1451577600 AND t<= 1454256000)
> tmp".
> However, it's still slow because the subquery creates a temp table.
>
> I would like to know how can I specify where filters so I don't need
> to load the whole table?
>
> From spark source code I guess the filter in JDBCRelation is the
> solution I'm looking for.  However, I don't know how to create a
> filter and pass it to jdbc driver.
> ===
>
> https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
> ===
>
>
>
> --
> Thanks for help,
> Jyun-Fan Tsai
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Pyspark accumulator

2016-05-13 Thread Abi
On Tue, May 10, 2016 at 2:24 PM, Abi  wrote:

> 1. How come pyspark does not provide the localvalue function like scala ?
>
> 2. Why is pyspark more restrictive than scala ?


Re: pyspark mappartions ()

2016-05-13 Thread Abi
On Tue, May 10, 2016 at 2:20 PM, Abi  wrote:

> Is there any example of this ? I want to see how you write the the
> iterable example


How to limit search range without using subquery when query SQL DB via JDBC?

2016-05-13 Thread Jyun-Fan Tsai
I try to load some rows from a big SQL table.  Here is my code:

===
jdbcDF = sqlContext.read.format("jdbc").options(
  url="jdbc:postgresql://...",
  dbtable="mytable",
  partitionColumn="t",
  lowerBound=1451577600,
  upperBound=1454256000,
  numPartitions=1).load()
print(jdbcDF.count())
===

The code runs very slow because Spark tries to load whole table.
I know there is a solution that uses subquery.  I can use:

dbtable="(SELECT * FROM mytable WHERE t>=1451577600 AND t<= 1454256000) tmp".
However, it's still slow because the subquery creates a temp table.

I would like to know how can I specify where filters so I don't need
to load the whole table?

>From spark source code I guess the filter in JDBCRelation is the
solution I'm looking for.  However, I don't know how to create a
filter and pass it to jdbc driver.
===
https://github.com/apache/spark/blob/40ed2af587cedadc6e5249031857a922b3b234ca/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
===



-- 
Thanks for help,
Jyun-Fan Tsai

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.4.1 + Kafka 0.8.2 with Kerberos

2016-05-13 Thread Mail.com
Hi All,

I am trying to get spark 1.4.1 (Java) work with Kafka 0.8.2 in Kerberos enabled 
cluster. HDP 2.3.2

Is there any document I can refer to.

Thanks,
Pradeep 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



support for golang

2016-05-13 Thread Sourav Chakraborty
Folks,
  Was curious to find out if anybody ever considered/attempted to support
golang with spark .

-Thanks
Sourav


broadcast variable not picked up

2016-05-13 Thread abi
def kernel(arg):
input = broadcast_var.value + 1
#some processing with input

def foo():
  
  
  broadcast_var = sc.broadcast(var)
  rdd.foreach(kernel)


def main():
   #something


In this code , I get the following error:
NameError: global name 'broadcast_var ' is not defined


Any ideas on how to fix it ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-variable-not-picked-up-tp26955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: System memory 186646528 must be at least 4.718592E8.

2016-05-13 Thread satish saley
Thank you . Looking at the source code helped :)

I set spark.testing.memory to 512 MB and it worked :)

private def getMaxMemory(conf: SparkConf): Long = {
  val systemMemory = conf.getLong("spark.testing.memory",
Runtime.getRuntime.maxMemory)
  val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
  val minSystemMemory = reservedMemory * 1.5
  if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +


On Fri, May 13, 2016 at 12:51 PM, Ted Yu  wrote:

> Here is related code:
>
>   val executorMemory = conf.*getSizeAsBytes*("spark.executor.memory")
>   if (executorMemory < minSystemMemory) {
> throw new IllegalArgumentException(s"Executor memory
> $executorMemory must be at least " +
>
> On Fri, May 13, 2016 at 12:47 PM, satish saley 
> wrote:
>
>> Hello,
>> I am running
>> https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/pi.py
>>  example,
>> but facing following exception
>>
>> What is the unit of memory pointed out in the error?
>>
>> Following are configs
>>
>> --master
>>
>> local[*]
>>
>> --deploy-mode
>>
>> client
>>
>> --name
>>
>> PysparkExample
>>
>> --py-files
>>
>> py4j-0.9-src.zip,pyspark.zip,
>>
>> --verbose
>>
>>
>> pi.py/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> None.org.apache.spark.api.java.JavaSparkContext.
>>
>> : java.lang.IllegalArgumentException: System memory 186646528 must be at
>> least 4.718592E8. Please use a larger heap size.
>>
>> at
>> org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:193)
>>
>> at
>> org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:175)
>>
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)
>>
>> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>>
>> at
>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
>>
>> at org.apache.spark.SparkContext.(SparkContext.scala:457)
>>
>> at
>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
>>
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>
>> at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
>>
>> at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>
>> at py4j.Gateway.invoke(Gateway.java:214)
>>
>> at
>> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
>>
>> at
>> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
>>
>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>
>


Re: strange behavior when I chain data frame transformations

2016-05-13 Thread Andy Davidson
Hi Ted


Its seems really strange. Its seems like in the version were I used 2 data
frames spark added ³as(tag)². (Which is really nice. )

Odd that I got different behavior

Is this a bug?

Kind regards

Andy



From:  Ted Yu 
Date:  Friday, May 13, 2016 at 12:38 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: strange behavior when I chain data frame transformations

> In the structure shown, tag is under element.
> 
> I wonder if that was a factor.
> 
> On Fri, May 13, 2016 at 11:49 AM, Andy Davidson
>  wrote:
>> I am using spark-1.6.1.
>> 
>> I create a data frame from a very complicated JSON file. I would assume that
>> query planer would treat both version of my transformation chains the same
>> way.
>> 
>> 
>> // org.apache.spark.sql.AnalysisException: Cannot resolve column name "tag"
>> among (actor, body, generator, pip, id, inReplyTo, link, object, objectType,
>> postedTime, provider, retweetCount, twitter_entities, verb);
>> 
>> // DataFrame emptyDF = rawDF.selectExpr("*", ³pip.rules.tag")
>> 
>> // .filter(rawDF.col(tagCol).isNull());
>> 
>> DataFrame emptyDF1 = rawDF.selectExpr("*", ³pip.rules.tag");
>> 
>> DataFrame emptyDF =  emptyDF1.filter(emptyDF1.col(³tag").isNull());
>> 
>> 
>> 
>> Here is the schema for the gnip structure
>> 
>>  |-- pip: struct (nullable = true)
>> 
>>  ||-- _profile: struct (nullable = true)
>> 
>>  |||-- topics: array (nullable = true)
>> 
>>  ||||-- element: string (containsNull = true)
>> 
>>  ||-- rules: array (nullable = true)
>> 
>>  |||-- element: struct (containsNull = true)
>> 
>>  ||||-- tag: string (nullable = true)
>> 
>> 
>> 
>> Is this a bug ?
>> 
>> 
>> 
>> Andy
>> 
>> 
> 




Re: SQLContext and HiveContext parse a query string differently ?

2016-05-13 Thread Hao Ren
Basically, I want to run the following query:

select 'a\'b', case(null as Array)

However, neither HiveContext and SQLContext can execute it without
exception.

I have tried

sql(select 'a\'b', case(null as Array))

and

df.selectExpr("'a\'b'", "case(null as Array)")

Neither of them works.

>From the exceptions, I find the query is parsed differently.



On Fri, May 13, 2016 at 8:00 AM, Yong Zhang  wrote:

> Not sure what do you mean? You want to have one exactly query running fine
> in both sqlContext and HiveContext? The query parser are different, why do
> you want to have this feature? Do I understand your question correctly?
>
> Yong
>
> --
> Date: Thu, 12 May 2016 13:09:34 +0200
> Subject: SQLContext and HiveContext parse a query string differently ?
> From: inv...@gmail.com
> To: user@spark.apache.org
>
>
> HI,
>
> I just want to figure out why the two contexts behavior differently even
> on a simple query.
> In a netshell, I have a query in which there is a String containing single
> quote and casting to Array/Map.
> I have tried all the combination of diff type of sql context and query
> call api (sql, df.select, df.selectExpr).
> I can't find one rules all.
>
> Here is the code for reproducing the problem.
>
> -
>
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkConf, SparkContext}
>
> object Test extends App {
>
>   val sc  = new SparkContext("local[2]", "test", new SparkConf)
>   val hiveContext = new HiveContext(sc)
>   val sqlContext  = new SQLContext(sc)
>
>   val context = hiveContext
>   //  val context = sqlContext
>
>   import context.implicits._
>
>   val df = Seq((Seq(1, 2), 2)).toDF("a", "b")
>   df.registerTempTable("tbl")
>   df.printSchema()
>
>   // case 1
>   context.sql("select cast(a as array) from tbl").show()
>   // HiveContext => org.apache.spark.sql.AnalysisException: cannot recognize 
> input near 'array' '<' 'string' in primitive type specification; line 1 pos 17
>   // SQLContext => OK
>
>   // case 2
>   context.sql("select 'a\\'b'").show()
>   // HiveContext => OK
>   // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string 
> literal) found
>
>   // case 3
>   df.selectExpr("cast(a as array)").show() // OK with HiveContext and 
> SQLContext
>
>   // case 4
>   df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end 
> of input expected
> }
>
> -
>
> Any clarification / workaround is high appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: System memory 186646528 must be at least 4.718592E8.

2016-05-13 Thread Ted Yu
Here is related code:

  val executorMemory = conf.*getSizeAsBytes*("spark.executor.memory")
  if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory
$executorMemory must be at least " +

On Fri, May 13, 2016 at 12:47 PM, satish saley 
wrote:

> Hello,
> I am running
> https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/pi.py
>  example,
> but facing following exception
>
> What is the unit of memory pointed out in the error?
>
> Following are configs
>
> --master
>
> local[*]
>
> --deploy-mode
>
> client
>
> --name
>
> PysparkExample
>
> --py-files
>
> py4j-0.9-src.zip,pyspark.zip,
>
> --verbose
>
>
> pi.py/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> None.org.apache.spark.api.java.JavaSparkContext.
>
> : java.lang.IllegalArgumentException: System memory 186646528 must be at
> least 4.718592E8. Please use a larger heap size.
>
> at
> org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:193)
>
> at
> org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:175)
>
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)
>
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>
> at
> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:457)
>
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)
>
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
>
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
>
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>
> at py4j.Gateway.invoke(Gateway.java:214)
>
> at
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
>
> at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>
> at java.lang.Thread.run(Thread.java:745)
>


Spark job fails when using checkpointing if a class change in the job

2016-05-13 Thread map reduced
Hi,

I have my application jar sitting in HDFS which defines long-running Spark
Streaming job and I am using checkpoint dir also in HDFS. Every time I have
any changes to the job, I go delete that jar and upload a new one.

Now if I upload a new jar and delete checkpoint directory it works fine.
But if I don't delete the checkpoint directory, I get an error like:

imestamp="2016-05-13T18:49:47,887+",level="WARN",threadName="main",logger="org.apache.spark.streaming.CheckpointReader",message="Error
reading checkpoint from file
hdfs://myCheckpoints/application-1/checkpoint-1463165355000",exception=*"java.io.InvalidClassException:
some.package.defined.here.ConcreteClass; local class incompatible: stream
classdesc serialVersionUID = -7808345595732501156, local class
serialVersionUID = 1574855058137843618*

I have changed the 'ConcreteClass' from my last implementation and that's
what's causing the issue.

I have 2 main questions:

   1. *How to fix this?* I know adding private static long serialVersionUID
   = 1113799434508676095L; might fix it, but I don't want to add this to
   all classes, since any class can change between current and next version.
   Anything better?
   2. *What all does checkpoint directory store?* Does it store all classes
   from previous jar? Or just their name and serialVersionUID? This
   

doesn't
   give much detail on internals of checkpointing. It's size is only ~ 6Mb.

Appreciate any help.

Just asked this question on:
https://stackoverflow.com/questions/37217738/spark-job-fails-when-using-checkpointing-if-a-class-change-in-the-job

Thanks,

KP


System memory 186646528 must be at least 4.718592E8.

2016-05-13 Thread satish saley
Hello,
I am running
https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/pi.py
example,
but facing following exception

What is the unit of memory pointed out in the error?

Following are configs

--master

local[*]

--deploy-mode

client

--name

PysparkExample

--py-files

py4j-0.9-src.zip,pyspark.zip,

--verbose


pi.py/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.

: java.lang.IllegalArgumentException: System memory 186646528 must be at
least 4.718592E8. Please use a larger heap size.

at
org.apache.spark.memory.UnifiedMemoryManager$.getMaxMemory(UnifiedMemoryManager.scala:193)

at
org.apache.spark.memory.UnifiedMemoryManager$.apply(UnifiedMemoryManager.scala:175)

at org.apache.spark.SparkEnv$.create(SparkEnv.scala:354)

at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)

at
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)

at org.apache.spark.SparkContext.(SparkContext.scala:457)

at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:59)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)

at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:422)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)

at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)

at py4j.Gateway.invoke(Gateway.java:214)

at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)

at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)

at py4j.GatewayConnection.run(GatewayConnection.java:209)

at java.lang.Thread.run(Thread.java:745)


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Ok, so that worked flawlessly after I upped the number of partitions to 400
from 40.

Thanks!

On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung 
wrote:

> I'll try that, as of now I have a small number of partitions in the order
> of 20~40.
>
> It would be great if there's some documentation on the memory requirement
> wrt the number of keys and the number of partitions per executor (i.e., the
> Spark's internal memory requirement outside of the user space).
>
> Otherwise, it's like shooting in the dark.
>
> On Fri, May 13, 2016 at 7:20 PM, Ted Yu  wrote:
>
>> Have you taken a look at SPARK-11293 ?
>>
>> Consider using repartition to increase the number of partitions.
>>
>> FYI
>>
>> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark version 1.6.0 and have trouble with memory when trying
>>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get
>>> the following exception when I run the task.
>>>
>>> There are 20 workers in the cluster. It is running under the standalone
>>> mode with 12 GB assigned per executor and 4 cores per worker. The
>>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>>
>>> What might be the problem here? Since I'm using the version 1.6.0, this
>>> doesn't seem to be related to  SPARK-12155. This problem always happens
>>> during the shuffle read phase.
>>>
>>> Is there a minimum  amount of memory required for executor
>>> (spark.memory.fraction) for shuffle read?
>>>
>>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>> at 
>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>> at 
>>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>
>


Re: strange behavior when I chain data frame transformations

2016-05-13 Thread Ted Yu
In the structure shown, tag is under element.

I wonder if that was a factor.

On Fri, May 13, 2016 at 11:49 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I am using spark-1.6.1.
>
> I create a data frame from a very complicated JSON file. I would assume
> that query planer would treat both version of my transformation chains the
> same way.
>
>
> // org.apache.spark.sql.AnalysisException: Cannot resolve column name
> "tag" among (actor, body, generator, pip, id, inReplyTo, link, object,
> objectType, postedTime, provider, retweetCount, twitter_entities, verb);
>
> // DataFrame emptyDF = rawDF.selectExpr("*", “pip.rules.tag")
>
> // .filter(rawDF.col(tagCol).isNull());
>
> DataFrame emptyDF1 = rawDF.selectExpr("*", “pip.rules.tag");
>
> DataFrame emptyDF =  emptyDF1.filter(emptyDF1.col(“tag").isNull());
>
>
> Here is the schema for the gnip structure
>
>  |-- pip: struct (nullable = true)
>
>  ||-- _profile: struct (nullable = true)
>
>  |||-- topics: array (nullable = true)
>
>  ||||-- element: string (containsNull = true)
>
>  ||-- rules: array (nullable = true)
>
>  |||-- element: struct (containsNull = true)
>
>  ||||-- tag: string (nullable = true)
>
>
> Is this a bug ?
>
>
> Andy
>
>
>


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
I'll try that, as of now I have a small number of partitions in the order
of 20~40.

It would be great if there's some documentation on the memory requirement
wrt the number of keys and the number of partitions per executor (i.e., the
Spark's internal memory requirement outside of the user space).

Otherwise, it's like shooting in the dark.

On Fri, May 13, 2016 at 7:20 PM, Ted Yu  wrote:

> Have you taken a look at SPARK-11293 ?
>
> Consider using repartition to increase the number of partitions.
>
> FYI
>
> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
> coded...@cs.stanford.edu> wrote:
>
>> Hello,
>>
>> I'm using Spark version 1.6.0 and have trouble with memory when trying to
>> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
>> following exception when I run the task.
>>
>> There are 20 workers in the cluster. It is running under the standalone
>> mode with 12 GB assigned per executor and 4 cores per worker. The
>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>
>> What might be the problem here? Since I'm using the version 1.6.0, this
>> doesn't seem to be related to  SPARK-12155. This problem always happens
>> during the shuffle read phase.
>>
>> Is there a minimum  amount of memory required for executor
>> (spark.memory.fraction) for shuffle read?
>>
>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>  at 
>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>  at 
>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Ted Yu
Have you taken a look at SPARK-11293 ?

Consider using repartition to increase the number of partitions.

FYI

On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung 
wrote:

> Hello,
>
> I'm using Spark version 1.6.0 and have trouble with memory when trying to
> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
> following exception when I run the task.
>
> There are 20 workers in the cluster. It is running under the standalone
> mode with 12 GB assigned per executor and 4 cores per worker. The
> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>
> What might be the problem here? Since I'm using the version 1.6.0, this
> doesn't seem to be related to  SPARK-12155. This problem always happens
> during the shuffle read phase.
>
> Is there a minimum  amount of memory required for executor
> (spark.memory.fraction) for shuffle read?
>
> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>   at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Hello,

I'm using Spark version 1.6.0 and have trouble with memory when trying to
do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
following exception when I run the task.

There are 20 workers in the cluster. It is running under the standalone
mode with 12 GB assigned per executor and 4 cores per worker. The
spark.memory.fraction is set to 0.5 and I'm not using any caching.

What might be the problem here? Since I'm using the version 1.6.0, this
doesn't seem to be related to  SPARK-12155. This problem always happens
during the shuffle read phase.

Is there a minimum  amount of memory required for executor
(spark.memory.fraction) for shuffle read?

java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
at 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


API to study key cardinality and distribution and other important statistics about data at certain stage

2016-05-13 Thread Nirav Patel
Hi,

Problem is every time job fails or perform poorly at certain stages you
need to study your data distribution just before THAT stage. Overall look
at input data set doesn't help very much if you have so many transformation
going on in DAG. I alway end up writing complicated typed code to run
analysis vs actual job to identify this. Shouldn't there be spark api to
examine this in better way. After all it does go through all the records
(in most cases) to perform transformation or action so as a side job it can
gather statistics as well when instructed.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



strange behavior when I chain data frame transformations

2016-05-13 Thread Andy Davidson
I am using spark-1.6.1.

I create a data frame from a very complicated JSON file. I would assume that
query planer would treat both version of my transformation chains the same
way.


// org.apache.spark.sql.AnalysisException: Cannot resolve column name "tag"
among (actor, body, generator, pip, id, inReplyTo, link, object, objectType,
postedTime, provider, retweetCount, twitter_entities, verb);

// DataFrame emptyDF = rawDF.selectExpr("*", ³pip.rules.tag")

// .filter(rawDF.col(tagCol).isNull());

DataFrame emptyDF1 = rawDF.selectExpr("*", ³pip.rules.tag");

DataFrame emptyDF =  emptyDF1.filter(emptyDF1.col(³tag").isNull());



Here is the schema for the gnip structure

 |-- pip: struct (nullable = true)

 ||-- _profile: struct (nullable = true)

 |||-- topics: array (nullable = true)

 ||||-- element: string (containsNull = true)

 ||-- rules: array (nullable = true)

 |||-- element: struct (containsNull = true)

 ||||-- tag: string (nullable = true)



Is this a bug ?



Andy






Re: Tracking / estimating job progress

2016-05-13 Thread Dood

On 5/13/2016 10:39 AM, Anthony May wrote:
It looks like it might only be available via REST, 
http://spark.apache.org/docs/latest/monitoring.html#rest-api


Nice, thanks!



On Fri, 13 May 2016 at 11:24 Dood@ODDO > wrote:


On 5/13/2016 10:16 AM, Anthony May wrote:
>

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkStatusTracker
>
> Might be useful

How do you use it? You cannot instantiate the class - is the
constructor
private? Thanks!

>
> On Fri, 13 May 2016 at 11:11 Ted Yu 
> >> wrote:
>
> Have you looked
> at
core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
> ?
>
> Cheers
>
> On Fri, May 13, 2016 at 10:05 AM, Dood@ODDO

> >>
wrote:
>
> I provide a RESTful API interface from scalatra for
launching
> Spark jobs - part of the functionality is tracking these
jobs.
> What API is available to track the progress of a particular
> spark application? How about estimating where in the
total job
> progress the job is?
>
> Thanks!
>
>
 -
> To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org

> >
> For additional commands, e-mail:
user-h...@spark.apache.org 
> >
>
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tracking / estimating job progress

2016-05-13 Thread Anthony May
It looks like it might only be available via REST,
http://spark.apache.org/docs/latest/monitoring.html#rest-api

On Fri, 13 May 2016 at 11:24 Dood@ODDO  wrote:

> On 5/13/2016 10:16 AM, Anthony May wrote:
> >
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkStatusTracker
> >
> > Might be useful
>
> How do you use it? You cannot instantiate the class - is the constructor
> private? Thanks!
>
> >
> > On Fri, 13 May 2016 at 11:11 Ted Yu  > > wrote:
> >
> > Have you looked
> > at
> core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
> > ?
> >
> > Cheers
> >
> > On Fri, May 13, 2016 at 10:05 AM, Dood@ODDO  > > wrote:
> >
> > I provide a RESTful API interface from scalatra for launching
> > Spark jobs - part of the functionality is tracking these jobs.
> > What API is available to track the progress of a particular
> > spark application? How about estimating where in the total job
> > progress the job is?
> >
> > Thanks!
> >
> >
>  -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > 
> > For additional commands, e-mail: user-h...@spark.apache.org
> > 
> >
> >
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Tracking / estimating job progress

2016-05-13 Thread Dood

On 5/13/2016 10:16 AM, Anthony May wrote:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkStatusTracker

Might be useful


How do you use it? You cannot instantiate the class - is the constructor 
private? Thanks!




On Fri, 13 May 2016 at 11:11 Ted Yu > wrote:


Have you looked
at core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
?

Cheers

On Fri, May 13, 2016 at 10:05 AM, Dood@ODDO > wrote:

I provide a RESTful API interface from scalatra for launching
Spark jobs - part of the functionality is tracking these jobs.
What API is available to track the progress of a particular
spark application? How about estimating where in the total job
progress the job is?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Tracking / estimating job progress

2016-05-13 Thread Anthony May
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkStatusTracker

Might be useful

On Fri, 13 May 2016 at 11:11 Ted Yu  wrote:

> Have you looked
> at core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ?
>
> Cheers
>
> On Fri, May 13, 2016 at 10:05 AM, Dood@ODDO  wrote:
>
>> I provide a RESTful API interface from scalatra for launching Spark jobs
>> - part of the functionality is tracking these jobs. What API is available
>> to track the progress of a particular spark application? How about
>> estimating where in the total job progress the job is?
>>
>> Thanks!
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Tracking / estimating job progress

2016-05-13 Thread Ted Yu
Have you looked
at core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ?

Cheers

On Fri, May 13, 2016 at 10:05 AM, Dood@ODDO  wrote:

> I provide a RESTful API interface from scalatra for launching Spark jobs -
> part of the functionality is tracking these jobs. What API is available to
> track the progress of a particular spark application? How about estimating
> where in the total job progress the job is?
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Tracking / estimating job progress

2016-05-13 Thread Dood
I provide a RESTful API interface from scalatra for launching Spark jobs 
- part of the functionality is tracking these jobs. What API is 
available to track the progress of a particular spark application? How 
about estimating where in the total job progress the job is?


Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



pandas dataframe broadcasted. giving errors in datanode function called kernel

2016-05-13 Thread abi
pandas dataframe is broadcasted successfully. giving errors in datanode
function called kernel

Code:

dataframe_broadcast  = sc.broadcast(dataframe)

def kernel():
df_v = dataframe_broadcast.value


Error:

I get this error when I try accessing the value member of the broadcast
variable. Apprently it does not have a value, hence it tries to load from
the file again.

  File
"C:\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\broadcast.py",
line 97, in value
self._value = self.load(self._path)
  File
"C:\spark-1.6.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\broadcast.py",
line 88, in load
return pickle.load(f)
ImportError: No module named indexes.base

at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
at
org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/pandas-dataframe-broadcasted-giving-errors-in-datanode-function-called-kernel-tp26953.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get and save core dump of native library in executors

2016-05-13 Thread prateek arora
I am running my cluster on Ubuntu 14.04

Regards
Prateek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-get-and-save-core-dump-of-native-library-in-executors-tp26945p26952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating Nested dataframe from flat data.

2016-05-13 Thread Prashant Bhardwaj
Thank you. That's exactly I was looking for.

Regards
Prashant

On Fri, May 13, 2016 at 9:38 PM, Xinh Huynh  wrote:

> Hi Prashant,
>
> You can create struct columns using the struct() function in
> org.apache.spark.sql.functions --
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$
>
> val df = sc.parallelize(List(("a", "b", "c"))).toDF("A", "B", "C")
>
> import org.apache.spark.sql.functions._
> df.withColumn("D", struct($"a", $"b", $"c")).show()
>
> ---+---+---+---+ | A| B| C| D| +---+---+---+---+ | a| b|
> c|[a,b,c]| +---+---+---+---+
>
> You can repeat to get the inner nesting.
>
> Xinh
>
> On Fri, May 13, 2016 at 4:51 AM, Prashant Bhardwaj <
> prashant2006s...@gmail.com> wrote:
>
>> Hi
>>
>> Let's say I have a flat dataframe with 6 columns like.
>> {
>> "a": "somevalue",
>> "b": "somevalue",
>> "c": "somevalue",
>> "d": "somevalue",
>> "e": "somevalue",
>> "f": "somevalue"
>> }
>>
>> Now I want to convert this dataframe to contain nested column like
>>
>> {
>> "nested_obj1": {
>> "a": "somevalue",
>> "b": "somevalue"
>> },
>> "nested_obj2": {
>> "c": "somevalue",
>> "d": "somevalue",
>> "nested_obj3": {
>> "e": "somevalue",
>> "f": "somevalue"
>> }
>> }
>> }
>>
>> How can I achieve this? I'm using Spark-sql in scala.
>>
>> Regards
>> Prashant
>>
>
>


Re: Spark 2.0.0-snapshot: IllegalArgumentException: requirement failed: chunks must be non-empty

2016-05-13 Thread Ted Yu
Is it possible to come up with code snippet which reproduces the following ?

Thanks

On Fri, May 13, 2016 at 8:13 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> I am able to run my application after I compiled Spark source in the
> following way
>
> ./dev/change-scala-version.sh 2.11
>
> ./dev/make-distribution.sh --name spark-2.0.0-snapshot-bin-hadoop2.6 --tgz
> -Phadoop-2.6 -DskipTests
>
> But while the application is running I get the following exception, which
> I was not getting with Spark 1.6.1. Any idea why this might be happening?
>
> java.lang.IllegalArgumentException: requirement failed: chunks must be
> non-empty
>
> at scala.Predef$.require(Predef.scala:224)
>
> at
> org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:41)
>
> at
> org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:52)
>
> at
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:580)
>
> at
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:514)
>
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:601)
>
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:653)
>
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:329)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:280)
>
> at
> org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)
>
> at
> org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)
>
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>
> at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:20)
>
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>
> at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:304)
>
> at scala.collection.AbstractIterator.toSet(Iterator.scala:1336)
>
> at
> org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:392)
>
> at
> org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:391)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> On Fri, May 13, 2016 at 6:33 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Thank you for the response.
>>
>> I used the following command to build from source
>>
>> build/mvn -Dhadoop.version=2.6.4 -Phadoop-2.6 -DskipTests clean package
>>
>> Would this put in the required jars in .ivy2 during the build process? If
>> so, how can I make the spark distribution runnable, so that I can use it on
>> other machines as well (make-distribution.sh no longer exists in Spark root
>> folder)?
>>
>> For compiling my application, I put in the following lines in the
>> build.sbt
>>
>> packAutoSettings
>> val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
>> val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"
>>
>> lazy val root = (project in file(".")).
>>   settings(
>> name := "sparkel",
>> version := "0.1.0",
>> scalaVersion := "2.11.8",
>> libraryDependencies += spark,
>> libraryDependencies += sparksql
>>   )
>>
>>
>> Regards,
>> Raghava.
>>
>>
>> On Fri, May 13, 2016 at 12:23 AM, Luciano Resende 
>> wrote:
>>
>>> Spark has moved to build using Scala 2.11 by default in master/trunk.
>>>
>>> 

Re: Creating Nested dataframe from flat data.

2016-05-13 Thread Xinh Huynh
Hi Prashant,

You can create struct columns using the struct() function in
org.apache.spark.sql.functions --
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

val df = sc.parallelize(List(("a", "b", "c"))).toDF("A", "B", "C")

import org.apache.spark.sql.functions._
df.withColumn("D", struct($"a", $"b", $"c")).show()

---+---+---+---+ | A| B| C| D| +---+---+---+---+ | a| b| c|[a,b,c]|
+---+---+---+---+

You can repeat to get the inner nesting.

Xinh

On Fri, May 13, 2016 at 4:51 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> Hi
>
> Let's say I have a flat dataframe with 6 columns like.
> {
> "a": "somevalue",
> "b": "somevalue",
> "c": "somevalue",
> "d": "somevalue",
> "e": "somevalue",
> "f": "somevalue"
> }
>
> Now I want to convert this dataframe to contain nested column like
>
> {
> "nested_obj1": {
> "a": "somevalue",
> "b": "somevalue"
> },
> "nested_obj2": {
> "c": "somevalue",
> "d": "somevalue",
> "nested_obj3": {
> "e": "somevalue",
> "f": "somevalue"
> }
> }
> }
>
> How can I achieve this? I'm using Spark-sql in scala.
>
> Regards
> Prashant
>


Re: Joining a RDD to a Dataframe

2016-05-13 Thread Xinh Huynh
Hi Cyril,

In the case where there are no documents, it looks like there is a typo in
"addresses" (check the number of "d"s):

| scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))  <==
addresses
| org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
among (adresses); <== adresses

As for your question about joining on a nested array column, I don't know
if it is possible. Is it supported in normal SQL? Exploding seems the right
way because then there is only one join key per row, as opposed to the
array, which could have multiple join keys inside the array.

Xinh

On Thu, May 12, 2016 at 7:32 PM, Cyril Scetbon 
wrote:

> Nobody has the answer ?
>
> Another thing I've seen is that if I have no documents at all :
>
> scala> df.select(explode(df("addresses.id")).as("aid")).collect
> res27: Array[org.apache.spark.sql.Row] = Array()
>
> Then
>
> scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
> among (adresses);
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
>
> Is there a better way to query nested objects and to join between a DF
> containing nested objects and another regular data frame (yes it's the
> current case)
>
> On May 9, 2016, at 00:42, Cyril Scetbon  wrote:
>
> Hi Ashish,
>
> The issue is not related to converting a RDD to a DF. I did it. I was just
> asking if I should do it differently.
>
> The issue regards the exception when using array_contains with a
> sql.Column instead of a value.
>
> I found another way to do it using explode as follows :
>
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input,
> $"aid" === df_input("id")).select(df("id"))
>
> However, I'm wondering if it does almost the same or if the query is
> different and worst in term of performance.
>
> If someone can comment on it and maybe give me advices.
>
> Thank you.
>
> On May 8, 2016, at 22:12, Ashish Dubey  wrote:
>
> Is there any reason you dont want to convert this - i dont think join b/w
> RDD and DF is supported.
>
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
> wrote:
>
>> Hi,
>>
>> I have a RDD built during a spark streaming job and I'd like to join it
>> to a DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first
>> the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>>
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
>> array>, id: string]
>>
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>>
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2],
>> [idaddress12])
>>
>> I can get ids I want if I know the value to look for in addresses.id
>> using :
>>
>> scala> df.filter(array_contains(df("addresses.id"),
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>>
>> However when I try to join df_input and df and to use the previous filter
>> as the join condition I get an exception :
>>
>> scala> df.join(df_input, array_contains(df("adresses.id"),
>> df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.Column id
>> at
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>> at
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>> ...
>>
>> It seems that array_contains only supports static arguments and does not
>> replace a sql.Column by its value.
>>
>> What's the best way to achieve what I want to do ? (Also speaking in term
>> of performance)
>>
>> Thanks
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Spark 2.0.0-snapshot: IllegalArgumentException: requirement failed: chunks must be non-empty

2016-05-13 Thread Raghava Mutharaju
I am able to run my application after I compiled Spark source in the
following way

./dev/change-scala-version.sh 2.11

./dev/make-distribution.sh --name spark-2.0.0-snapshot-bin-hadoop2.6 --tgz
-Phadoop-2.6 -DskipTests

But while the application is running I get the following exception, which I
was not getting with Spark 1.6.1. Any idea why this might be happening?

java.lang.IllegalArgumentException: requirement failed: chunks must be
non-empty

at scala.Predef$.require(Predef.scala:224)

at
org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:41)

at
org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:52)

at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:580)

at
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:514)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:601)

at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:653)

at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:329)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:280)

at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)

at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)

at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)

at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:20)

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)

at scala.collection.AbstractIterator.to(Iterator.scala:1336)

at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:304)

at scala.collection.AbstractIterator.toSet(Iterator.scala:1336)

at
org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:392)

at
org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:391)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

On Fri, May 13, 2016 at 6:33 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Thank you for the response.
>
> I used the following command to build from source
>
> build/mvn -Dhadoop.version=2.6.4 -Phadoop-2.6 -DskipTests clean package
>
> Would this put in the required jars in .ivy2 during the build process? If
> so, how can I make the spark distribution runnable, so that I can use it on
> other machines as well (make-distribution.sh no longer exists in Spark root
> folder)?
>
> For compiling my application, I put in the following lines in the build.sbt
>
> packAutoSettings
> val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
> val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"
>
> lazy val root = (project in file(".")).
>   settings(
> name := "sparkel",
> version := "0.1.0",
> scalaVersion := "2.11.8",
> libraryDependencies += spark,
> libraryDependencies += sparksql
>   )
>
>
> Regards,
> Raghava.
>
>
> On Fri, May 13, 2016 at 12:23 AM, Luciano Resende 
> wrote:
>
>> Spark has moved to build using Scala 2.11 by default in master/trunk.
>>
>> As for the 2.0.0-SNAPSHOT, it is actually the version of master/trunk and
>> you might be missing some modules/profiles for your build. What command did
>> you use to build ?
>>
>> On Thu, May 12, 2016 at 9:01 PM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> I built Spark from the source code available at
>>> https://github.com/apache/spark/. Although I 

Re: Kafka partition increased while Spark Streaming is running

2016-05-13 Thread chandan prakash
Makes sense. thank you cody.

Regards,
Chandan

On Fri, May 13, 2016 at 8:10 PM, Cody Koeninger  wrote:

> No, I wouldn't expect it to, once the stream is defined (at least for
> the direct stream integration for kafka 0.8), the topicpartitions are
> fixed.
>
> My answer to any question about "but what if checkpoints don't let me
> do this" is always going to be "well, don't rely on checkpoints."
>
> If you want dynamic topicpartitions,
> https://issues.apache.org/jira/browse/SPARK-12177
>
>
> On Fri, May 13, 2016 at 4:24 AM, chandan prakash
>  wrote:
> > Follow up question :
> >
> >  If spark streaming is using checkpointing (/tmp/checkpointDir)  for
> > AtLeastOnce and  number of Topics or/and partitions has increased
> then
> >  will gracefully shutting down and restarting from checkpoint will
> consider
> > new topics or/and partitions ?
> >  If the answer is NO then how to start from the same checkpoint with new
> > partitions/topics included?
> >
> > Thanks,
> > Chandan
> >
> >
> > On Wed, Feb 24, 2016 at 9:30 PM, Cody Koeninger 
> wrote:
> >>
> >> That's correct, when you create a direct stream, you specify the
> >> topicpartitions you want to be a part of the stream (the other method
> for
> >> creating a direct stream is just a convenience wrapper).
> >>
> >> On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航  wrote:
> >>>
> >>> Here I use the 'KafkaUtils.createDirectStream' to integrate Kafka with
> >>> Spark Streaming. I submitted the app, then I changed (increased)
> Kafka's
> >>> partition number after it's running for a while. Then I check the input
> >>> offset with 'rdd.asInstanceOf[HasOffsetRanges].offsetRanges', seeing
> that
> >>> only the offset of the initial partitions are returned.
> >>>
> >>> Does this mean Spark Streaming's Kafka integration can't update its
> >>> parallelism when Kafka's partition number is changed?
> >>
> >>
> >
> >
> >
> > --
> > Chandan Prakash
> >
>



-- 
Chandan Prakash


memory leak exception

2016-05-13 Thread Imran Akbar
I'm trying to save a table using this code in pyspark with 1.6.1:

prices = sqlContext.sql("SELECT AVG(amount) AS mean_price, country FROM src
GROUP BY country")
prices.collect()
prices.write.saveAsTable('prices', format='parquet', mode='overwrite',
path='/mnt/bigdisk/tables')

but I'm getting this error:

16/05/13 02:04:24 INFO HadoopRDD: Input split:
file:/mnt/bigdisk/src.csv:100663296+33554432

16/05/13 02:04:33 WARN TaskMemoryManager: leak 68.0 MB memory from
org.apache.spark.unsafe.map.BytesToBytesMap@f9f1b5e

16/05/13 02:04:33 ERROR Executor: Managed memory leak detected; size =
71303168 bytes, TID = 4085

16/05/13 02:04:33 ERROR Executor: Exception in task 2.0 in stage 35.0 (TID
4085)

java.io.FileNotFoundException:
/mnt/bigdisk/spark_tmp/blockmgr-69da47e4-3a75-4244-80d3-9c7c0943e7f8/25/temp_shuffle_77078209-a2c5-466c-bba1-ff1a700f257c
(No such file or directory)

at java.io.FileOutputStream.open(Native Method)

at java.io.FileOutputStream.(FileOutputStream.java:221)

at
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)

at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:89)


any ideas what could be wrong?


thanks,

imran


Re: Kafka partition increased while Spark Streaming is running

2016-05-13 Thread Cody Koeninger
No, I wouldn't expect it to, once the stream is defined (at least for
the direct stream integration for kafka 0.8), the topicpartitions are
fixed.

My answer to any question about "but what if checkpoints don't let me
do this" is always going to be "well, don't rely on checkpoints."

If you want dynamic topicpartitions,
https://issues.apache.org/jira/browse/SPARK-12177


On Fri, May 13, 2016 at 4:24 AM, chandan prakash
 wrote:
> Follow up question :
>
>  If spark streaming is using checkpointing (/tmp/checkpointDir)  for
> AtLeastOnce and  number of Topics or/and partitions has increased  then
>  will gracefully shutting down and restarting from checkpoint will consider
> new topics or/and partitions ?
>  If the answer is NO then how to start from the same checkpoint with new
> partitions/topics included?
>
> Thanks,
> Chandan
>
>
> On Wed, Feb 24, 2016 at 9:30 PM, Cody Koeninger  wrote:
>>
>> That's correct, when you create a direct stream, you specify the
>> topicpartitions you want to be a part of the stream (the other method for
>> creating a direct stream is just a convenience wrapper).
>>
>> On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航  wrote:
>>>
>>> Here I use the 'KafkaUtils.createDirectStream' to integrate Kafka with
>>> Spark Streaming. I submitted the app, then I changed (increased) Kafka's
>>> partition number after it's running for a while. Then I check the input
>>> offset with 'rdd.asInstanceOf[HasOffsetRanges].offsetRanges', seeing that
>>> only the offset of the initial partitions are returned.
>>>
>>> Does this mean Spark Streaming's Kafka integration can't update its
>>> parallelism when Kafka's partition number is changed?
>>
>>
>
>
>
> --
> Chandan Prakash
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The metastore database gives errors when start spark-sql CLI.

2016-05-13 Thread Mich Talebzadeh
I do not know Postgres but that sounds like a system table much like Oracle
v$instance?

Why running a Hive schema script against a hive schema/DB in Postgres
should impact system schema?

Mine is Oracle

s...@mydb12.mich.LOCAL> SELECT version FROM v$instance;
VERSION
-
12.1.0.2.0

Otherwise just drop and recreate Hive DB/schema. However, check that the
problem with $instance has gone away.

This sounds like a system table corruption?

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 13 May 2016 at 09:09, Joseph  wrote:

>
> Hi all,
>
> I use PostgreSQL to store the hive metadata.
>
> First, I imported a sql script to metastore database as follows:
> psql -U postgres -d metastore -h 192.168.50.30 -f
> *hive-schema-1.2.0.postgres.sql*
>
> Then, when I started $SPARK_HOME/bin/*spark-sql*,  the PostgreSQL  gave
> the following errors:
>
> ERROR:  syntax error at or near "@@" at character 5
> STATEMENT:  SET @@session.sql_mode=ANSI_QUOTES
> ERROR:  relation "v$instance" does not exist at character 21
> STATEMENT:  SELECT version FROM v$instance
> ERROR:  column "version" does not exist at character 10
> STATEMENT:  SELECT @@version
>
> This does not affect normal use, but maybe it is a bug! ( I use spark
> 1.6.1 and  hive 1.2.1)
>
> --
> Joseph
>


Re: Confused - returning RDDs from functions

2016-05-13 Thread Dood

  
  
On 5/12/2016 10:01 PM, Holden Karau wrote:
This is not the expected behavior, can you maybe post
  the code where you are running into this?
  


Hello, thanks for replying!

Below is the function I took out from the code.

def converter(rdd: RDD[(String, JsValue)], param:String): RDD[(String, Int)] = {
  // I am breaking this down for future readability and ease of optimization
  // as a first attempt at solving this problem, I am not concerned with performance
  // and pretty, more with accuracy ;)
  // r1 will be an RDD containing only the "param" method of selection
  val r1:RDD[(String,JsValue)] = rdd.filter(x => (x._2 \ "field1" \ "field2").as[String].replace("\"","") == param.replace("\"",""))
  // r2 will be an RDD of Lists of fields (A1-Z1) with associated counts
  // remapFields returns a List[(String,Int)]
  val r2:RDD[List[(String,Int)]] = r1.map(x => remapFields(x._2 \ "extra"))
  // r3 will be flattened to enable grouping
  val r3:RDD[(String,Int)] = r2.flatMap(x => x)
  // now we can group by entity
  val r4:RDD[(String,Iterable[(String,Int)])] = r3.groupBy(x => x._1)
  // and produce a mapping of entity -> count pairs
  val r5:RDD[(String,Int)] = r4.map(x => (x._1, x._2.map(y => y._2).sum))
  // return the result
  r5
}

If I call on the above function and collectAsMap on the returned
RDD, I get an empty Map(). If I copy/paste this code into the
caller, I get the properly filled in Map.

I am fairly new to Spark and Scala so excuse any inefficiencies - my
priority was to be able to solve the problem in an obvious and
correct way and worry about making it pretty later. 

Thanks!

On Thursday, May 12, 2016, Dood@ODDO 
  wrote:
  Hello all,

I have been programming for years but this has me baffled.

I have an RDD[(String,Int)] that I return from a function after
extensive manipulation of an initial RDD of a different type.
When I return this RDD and initiate the .collectAsMap() on it
from the caller, I get an empty Map().

If I copy and paste the code from the function into the caller
(same exact code) and produce the same RDD and call
collectAsMap() on it, I get the Map with all the expected
information in it.

What gives?

Does Spark defy programming principles or am I crazy? ;-)

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

  
  
  
  -- 
  

  
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
  

  
  


  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSql Catalyst extending Analyzer, Error with CatalystConf

2016-05-13 Thread sib
Hello,

I am trying to write a basic analyzer, by extending the catalyst analyzer
with a few extra rules.

I am getting the following error:
*""" trait CatalystConf in package catalyst cannot be accessed in package
org.apache.spark.sql.catalyst """*


In my attempt I am doing the following:

class CustomSQLContext(sc: SparkContext) extends SQLContext(sc) {
val an = new CustomAnalyzer(Map("testRule" ->
testRule),catalog,functionRegistry,conf)
override lazy val analyzer: Analyzer = an
  }

class CustomAnalyzer(rules: Map[String, Rule[LogicalPlan]], catalog:
Catalog, registery: FunctionRegistry, conf: CatalystConf )
extends Analyzer( catalog, registery, conf) {
..
override lazy val batches = my_batch.toSeq ++ default_batches ++ Nil
  }

Any ideas how I can pass the conf to the customAnalyzer without this error?

I tried passing it as SQLConf but get a not found error and importing
doesn't seem to work.


Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSql-Catalyst-extending-Analyzer-Error-with-CatalystConf-tp26950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Datasets is extremely slow in comparison to RDD in standalone mode WordCount examlpe

2016-05-13 Thread Amit Sela
Taking it to a more basic level, I compared between a simple transformation
with RDDs and with Datasets. This is far simpler than Renato's use case and
this brungs up two good question:
1. Is the time it takes to "spin-up" a standalone instance of Spark(SQL) is
just an additional one-time overhead - which is reasonable, especially for
the first version of datasets..
2. Is Datasets, in some cases, slower than RDDs ? if so in which, and why ?

*Datasets code*: ~2000 msec
SQLContext sqc = createSQLContext(createContext());
sqc.createDataset(WORDS, Encoders.STRING())
.map(new MapFunction() {
@Override
  public String call(String value) throws Exception {
return value.toUpperCase();
  }
}, Encoders.STRING())
.show();

*RDDs code*: < 500 msec
JavaSparkContext jsc = createContext();
List res = jsc.parallelize(WORDS)
.map(new Function() {
  @Override
  public String call(String v1) throws Exception {
return v1.toUpperCase();
  }
   })
   .collect();

*Those are the context creation functions:*
 * static SQLContext createSQLContext(JavaSparkContext jsc) {*
*return new SQLContext(jsc);*
*  }*
*  static JavaSparkContext createContext() {*
*return new JavaSparkContext(new
SparkConf().setMaster("local[*]").setAppName("WordCount")*
*.set("spark.ui.enabled", "false"));*
*  }*
*And the input:*
*List WORDS = Arrays.asList("hi there", "hi", "hi sue bob", "hi
sue", "bob hi");*

On Thu, May 12, 2016 at 12:04 PM Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Amit,
>
> This is very interesting indeed because I have got similar resutls. I
> tried doing a filtter + groupBy using DataSet with a function, and using
> the inner RDD of the DF(RDD[row]). I used the inner RDD of a DataFrame
> because apparently there is no straight-forward way to create an RDD of
> Parquet data without creating a sqlContext. if anybody has some code to
> share with me, please share (:
> I used 1GB of parquet data and when doing the operations with the RDD it
> was much faster. After looking at the execution plans, it is clear why
> DataSets do worse. For using them an extra map operation is done to map row
> objects into the defined case class. Then the DataSet uses the whole query
> optimization platform (Catalyst and move objects in and out of Tungsten).
> Thus, I think for operations that are too "simple", it is more expensive to
> use the entire DS/DF infrastructure than the inner RDD.
> IMHO if you have complex SQL queries, it makes sense you use DS/DF but if
> you don't, then probably using RDDs directly is still faster.
>
>
> Renato M.
>
> 2016-05-11 20:17 GMT+02:00 Amit Sela :
>
>> Some how missed that ;)
>> Anything about Datasets slowness ?
>>
>> On Wed, May 11, 2016, 21:02 Ted Yu  wrote:
>>
>>> Which release are you using ?
>>>
>>> You can use the following to disable UI:
>>> --conf spark.ui.enabled=false
>>>
>>> On Wed, May 11, 2016 at 10:59 AM, Amit Sela 
>>> wrote:
>>>
 I've ran a simple WordCount example with a very small List as
 input lines and ran it in standalone (local[*]), and Datasets is very 
 slow..
 We're talking ~700 msec for RDDs while Datasets takes ~3.5 sec.
 Is this just start-up overhead ? please note that I'm not timing the
 context creation...

 And in general, is there a way to run with local[*] "lightweight" mode
 for testing ? something like without the WebUI server for example (and
 anything else that's not needed for testing purposes)

 Thanks,
 Amit

>>>
>>>
>


SparkSql Catalyst extending Analyzer, Error with CatalystConf

2016-05-13 Thread Alexander Sibetheros
Hello,

I am trying to write a basic analyzer, by extending the catalyst analyzer
with a few extra rules.

I am getting the following error:
""" *trait CatalystConf in package catalyst cannot be accessed in package
org.apache.spark.sql.catalyst* """


In my attempt I am doing the following:

class CustomSQLContext(sc: SparkContext) extends SQLContext(sc) {
val an = new CustomAnalyzer(Map("testRule" ->
testRule),catalog,functionRegistry,conf)
override lazy val analyzer: Analyzer = an
  }

class CustomAnalyzer(rules: Map[String, Rule[LogicalPlan]], catalog:
Catalog, registery: FunctionRegistry, conf: CatalystConf )
extends Analyzer( catalog, registery, conf) {
..
override lazy val batches = my_batch.toSeq ++ default_batches ++ Nil
  }

Any ideas how I can pass the conf to the customAnalyzer without this error?

I tried passing it as SQLConf but get a not found error and importing
doesn't seem to work.


Thanks!


Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I corrected the type to RDD, but it's still giving
me the error.
I believe I have found the reason though. The vals variable is created
using the map procedure on some other RDD. Although it is declared as a
JavaRDD, the classTag it returns is Object. I think
that because of this, the RDD returned from sliding() only accepts
Object as a type.
I have no idea how to fix this though.

On 13-05-16 13:12, Sean Owen wrote:
> The Java docs won't help since they only show "Object", yes. Have a
> look at the Scala docs:
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions
>
> An RDD of T produces an RDD of T[].
>
> On Fri, May 13, 2016 at 12:10 PM, Tom Godden  wrote:
>> I assumed the "fixed size blocks" mentioned in the documentation
>> (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html#sliding%28int,%20int%29)
>> were RDDs, but I guess they're arrays? Even when I change the RDD to
>> arrays (so it looks like RDD), it doesn't work.
>> I'm passing an RDD of ArrayLists of Integers to the sliding functions,
>> so that's where the ArrayList comes from.
>> I can't seem to find up to date example code, could you maybe give an
>> example?
>>
>> On 13-05-16 12:53, Sean Owen wrote:
>>> I'm not sure what you're trying there. The return type is an RDD of
>>> arrays, not of RDDs or of ArrayLists. There may be another catch but
>>> that is not it.
>>>
>>> On Fri, May 13, 2016 at 11:50 AM, Tom Godden  wrote:
 I believe it's an illegal cast. This is the line of code:
> RDD> windowed =
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
 with vals being a JavaRDD.  Explicitly casting
 doesn't work either:
> RDD> windowed = (RDD>)
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
 Did I miss something?

 On 13-05-16 09:44, Sean Owen wrote:
> The problem is there's no Java-friendly version of this, and the Scala
> API return type actually has no analog in Java (an array of any type,
> not just of objects) so it becomes Object. You can just cast it to the
> type you know it will be -- RDD or RDD or whatever.
>
> On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
>> Hello,
>>
>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>> window over it. Spark Streaming is not an option.
>> RDDFunctions.sliding() returns an item of class RDD,
>> regardless of the original type of the RDD. Because of this, the
>> returned item seems to be pretty much worthless.
>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>
>> Official docs:
>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>
>> Thanks
>>
>> 
>> View this message in context: Java: Return type of 
>> RDDFunctions.sliding(int,
>> int)
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating Nested dataframe from flat data.

2016-05-13 Thread Prashant Bhardwaj
Hi

Let's say I have a flat dataframe with 6 columns like.
{
"a": "somevalue",
"b": "somevalue",
"c": "somevalue",
"d": "somevalue",
"e": "somevalue",
"f": "somevalue"
}

Now I want to convert this dataframe to contain nested column like

{
"nested_obj1": {
"a": "somevalue",
"b": "somevalue"
},
"nested_obj2": {
"c": "somevalue",
"d": "somevalue",
"nested_obj3": {
"e": "somevalue",
"f": "somevalue"
}
}
}

How can I achieve this? I'm using Spark-sql in scala.

Regards
Prashant


Re: ANOVA test in Spark

2016-05-13 Thread mylisttech
Mayank,

Assuming Anova not present in MLIB can you not exploit the Anova from SparkR? I 
am enquiring not making a factual statement.

Thanks 



On May 13, 2016, at 15:54, mayankshete  wrote:

> Is ANOVA present in Spark Mllib if not then, when will be this feature be
> available in Spark ?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ANOVA-test-in-Spark-tp26949.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Sean Owen
The Java docs won't help since they only show "Object", yes. Have a
look at the Scala docs:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions

An RDD of T produces an RDD of T[].

On Fri, May 13, 2016 at 12:10 PM, Tom Godden  wrote:
> I assumed the "fixed size blocks" mentioned in the documentation
> (https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html#sliding%28int,%20int%29)
> were RDDs, but I guess they're arrays? Even when I change the RDD to
> arrays (so it looks like RDD), it doesn't work.
> I'm passing an RDD of ArrayLists of Integers to the sliding functions,
> so that's where the ArrayList comes from.
> I can't seem to find up to date example code, could you maybe give an
> example?
>
> On 13-05-16 12:53, Sean Owen wrote:
>> I'm not sure what you're trying there. The return type is an RDD of
>> arrays, not of RDDs or of ArrayLists. There may be another catch but
>> that is not it.
>>
>> On Fri, May 13, 2016 at 11:50 AM, Tom Godden  wrote:
>>> I believe it's an illegal cast. This is the line of code:
 RDD> windowed =
 RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>>> with vals being a JavaRDD.  Explicitly casting
>>> doesn't work either:
 RDD> windowed = (RDD>)
 RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>>> Did I miss something?
>>>
>>> On 13-05-16 09:44, Sean Owen wrote:
 The problem is there's no Java-friendly version of this, and the Scala
 API return type actually has no analog in Java (an array of any type,
 not just of objects) so it becomes Object. You can just cast it to the
 type you know it will be -- RDD or RDD or whatever.

 On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
> Hello,
>
> We're trying to use PrefixSpan on sequential data, by passing a sliding
> window over it. Spark Streaming is not an option.
> RDDFunctions.sliding() returns an item of class RDD,
> regardless of the original type of the RDD. Because of this, the
> returned item seems to be pretty much worthless.
> Is this a bug/nyi? Is there a way to circumvent this somehow?
>
> Official docs:
> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>
> Thanks
>
> 
> View this message in context: Java: Return type of 
> RDDFunctions.sliding(int,
> int)
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I assumed the "fixed size blocks" mentioned in the documentation
(https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html#sliding%28int,%20int%29)
were RDDs, but I guess they're arrays? Even when I change the RDD to
arrays (so it looks like RDD), it doesn't work.
I'm passing an RDD of ArrayLists of Integers to the sliding functions,
so that's where the ArrayList comes from.
I can't seem to find up to date example code, could you maybe give an
example?

On 13-05-16 12:53, Sean Owen wrote:
> I'm not sure what you're trying there. The return type is an RDD of
> arrays, not of RDDs or of ArrayLists. There may be another catch but
> that is not it.
>
> On Fri, May 13, 2016 at 11:50 AM, Tom Godden  wrote:
>> I believe it's an illegal cast. This is the line of code:
>>> RDD> windowed =
>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>> with vals being a JavaRDD.  Explicitly casting
>> doesn't work either:
>>> RDD> windowed = (RDD>)
>>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
>> Did I miss something?
>>
>> On 13-05-16 09:44, Sean Owen wrote:
>>> The problem is there's no Java-friendly version of this, and the Scala
>>> API return type actually has no analog in Java (an array of any type,
>>> not just of objects) so it becomes Object. You can just cast it to the
>>> type you know it will be -- RDD or RDD or whatever.
>>>
>>> On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
 Hello,

 We're trying to use PrefixSpan on sequential data, by passing a sliding
 window over it. Spark Streaming is not an option.
 RDDFunctions.sliding() returns an item of class RDD,
 regardless of the original type of the RDD. Because of this, the
 returned item seems to be pretty much worthless.
 Is this a bug/nyi? Is there a way to circumvent this somehow?

 Official docs:
 https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

 Thanks

 
 View this message in context: Java: Return type of 
 RDDFunctions.sliding(int,
 int)
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Graceful shutdown of spark streaming on yarn

2016-05-13 Thread Rakesh H (Marketing Platform-BLR)
Have you used awaitTermination() on your ssc ? --> Yes, i have used that.
Also try setting the deployment mode to yarn-client. --> Is this not
supported on yarn-cluster mode?  I am trying to find root cause for
yarn-cluster mode.
Have you tested graceful shutdown on yarn-cluster mode?

On Fri, May 13, 2016 at 11:54 AM Deepak Sharma 
wrote:

> Rakesh
> Have you used awaitTermination() on your ssc ?
> If not , dd this and see if it changes the behavior.
> I am guessing this issue may be related to yarn deployment mode.
> Also try setting the deployment mode to yarn-client.
>
> Thanks
> Deepak
>
>
> On Fri, May 13, 2016 at 10:17 AM, Rakesh H (Marketing Platform-BLR) <
> rakes...@flipkart.com> wrote:
>
>> Ping!!
>> Has anybody tested graceful shutdown of a spark streaming in yarn-cluster
>> mode?It looks like a defect to me.
>>
>>
>> On Thu, May 12, 2016 at 12:53 PM Rakesh H (Marketing Platform-BLR) <
>> rakes...@flipkart.com> wrote:
>>
>>> We are on spark 1.5.1
>>> Above change was to add a shutdown hook.
>>> I am not adding shutdown hook in code, so inbuilt shutdown hook is being
>>> called.
>>> Driver signals that it is going to to graceful shutdown, but executor
>>> sees that Driver is dead and it shuts down abruptly.
>>> Could this issue be related to yarn? I see correct behavior locally. I
>>> did "yarn kill " to kill the job.
>>>
>>>
>>> On Thu, May 12, 2016 at 12:28 PM Deepak Sharma 
>>> wrote:
>>>
 This is happening because spark context shuts down without shutting
 down the ssc first.
 This was behavior till spark 1.4 ans was addressed in later releases.
 https://github.com/apache/spark/pull/6307

 Which version of spark are you on?

 Thanks
 Deepak

 On Thu, May 12, 2016 at 12:14 PM, Rakesh H (Marketing Platform-BLR) <
 rakes...@flipkart.com> wrote:

> Yes, it seems to be the case.
> In this case executors should have continued logging values till 300,
> but they are shutdown as soon as i do "yarn kill .."
>
> On Thu, May 12, 2016 at 12:11 PM Deepak Sharma 
> wrote:
>
>> So in your case , the driver is shutting down gracefully , but the
>> executors are not.
>> IS this the problem?
>>
>> Thanks
>> Deepak
>>
>> On Thu, May 12, 2016 at 11:49 AM, Rakesh H (Marketing Platform-BLR) <
>> rakes...@flipkart.com> wrote:
>>
>>> Yes, it is set to true.
>>> Log of driver :
>>>
>>> 16/05/12 10:18:29 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: 
>>> SIGTERM
>>> 16/05/12 10:18:29 INFO streaming.StreamingContext: Invoking 
>>> stop(stopGracefully=true) from shutdown hook
>>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Stopping JobGenerator 
>>> gracefully
>>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Waiting for all received 
>>> blocks to be consumed for job generation
>>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Waited for all received 
>>> blocks to be consumed for job generation
>>>
>>> Log of executor:
>>> 16/05/12 10:18:29 ERROR executor.CoarseGrainedExecutorBackend: Driver 
>>> xx.xx.xx.xx:x disassociated! Shutting down.
>>> 16/05/12 10:18:29 WARN remote.ReliableDeliverySupervisor: Association 
>>> with remote system [xx.xx.xx.xx:x] has failed, address is now gated 
>>> for [5000] ms. Reason: [Disassociated]
>>> 16/05/12 10:18:29 INFO storage.DiskBlockManager: Shutdown hook called
>>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE 
>>> -> 204 //This is value i am logging
>>> 16/05/12 10:18:29 INFO util.ShutdownHookManager: Shutdown hook called
>>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE 
>>> -> 205
>>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE 
>>> -> 206
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 12, 2016 at 11:45 AM Deepak Sharma <
>>> deepakmc...@gmail.com> wrote:
>>>
 Hi Rakesh
 Did you tried setting *spark.streaming.stopGracefullyOnShutdown to
 true *for your spark configuration instance?
 If not try this , and let us know if this helps.

 Thanks
 Deepak

 On Thu, May 12, 2016 at 11:42 AM, Rakesh H (Marketing Platform-BLR)
  wrote:

> Issue i am having is similar to the one mentioned here :
>
> http://stackoverflow.com/questions/36911442/how-to-stop-gracefully-a-spark-streaming-application-on-yarn
>
> I am creating a rdd from sequence of 1 to 300 and creating
> streaming RDD out of it.
>
> val rdd = ssc.sparkContext.parallelize(1 to 300)
> val dstream = new ConstantInputDStream(ssc, rdd)
> dstream.foreachRDD{ rdd =>
>   rdd.foreach{ x =>
> log(x)

Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Sean Owen
I'm not sure what you're trying there. The return type is an RDD of
arrays, not of RDDs or of ArrayLists. There may be another catch but
that is not it.

On Fri, May 13, 2016 at 11:50 AM, Tom Godden  wrote:
> I believe it's an illegal cast. This is the line of code:
>> RDD> windowed =
>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
> with vals being a JavaRDD.  Explicitly casting
> doesn't work either:
>> RDD> windowed = (RDD>)
>> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
> Did I miss something?
>
> On 13-05-16 09:44, Sean Owen wrote:
>> The problem is there's no Java-friendly version of this, and the Scala
>> API return type actually has no analog in Java (an array of any type,
>> not just of objects) so it becomes Object. You can just cast it to the
>> type you know it will be -- RDD or RDD or whatever.
>>
>> On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
>>> Hello,
>>>
>>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>>> window over it. Spark Streaming is not an option.
>>> RDDFunctions.sliding() returns an item of class RDD,
>>> regardless of the original type of the RDD. Because of this, the
>>> returned item seems to be pretty much worthless.
>>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>>
>>> Official docs:
>>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>>
>>> Thanks
>>>
>>> 
>>> View this message in context: Java: Return type of RDDFunctions.sliding(int,
>>> int)
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Tom Godden
I believe it's an illegal cast. This is the line of code:
> RDD> windowed =
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
with vals being a JavaRDD.  Explicitly casting
doesn't work either:
> RDD> windowed = (RDD>)
> RDDFunctions.fromRDD(vals.rdd(), vals.classTag()).sliding(20, 1);
Did I miss something?

On 13-05-16 09:44, Sean Owen wrote:
> The problem is there's no Java-friendly version of this, and the Scala
> API return type actually has no analog in Java (an array of any type,
> not just of objects) so it becomes Object. You can just cast it to the
> type you know it will be -- RDD or RDD or whatever.
>
> On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
>> Hello,
>>
>> We're trying to use PrefixSpan on sequential data, by passing a sliding
>> window over it. Spark Streaming is not an option.
>> RDDFunctions.sliding() returns an item of class RDD,
>> regardless of the original type of the RDD. Because of this, the
>> returned item seems to be pretty much worthless.
>> Is this a bug/nyi? Is there a way to circumvent this somehow?
>>
>> Official docs:
>> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>>
>> Thanks
>>
>> 
>> View this message in context: Java: Return type of RDDFunctions.sliding(int,
>> int)
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sbt for Spark build with Scala 2.11

2016-05-13 Thread Raghava Mutharaju
Thank you for the response.

I used the following command to build from source

build/mvn -Dhadoop.version=2.6.4 -Phadoop-2.6 -DskipTests clean package

Would this put in the required jars in .ivy2 during the build process? If
so, how can I make the spark distribution runnable, so that I can use it on
other machines as well (make-distribution.sh no longer exists in Spark root
folder)?

For compiling my application, I put in the following lines in the build.sbt

packAutoSettings
val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"

lazy val root = (project in file(".")).
  settings(
name := "sparkel",
version := "0.1.0",
scalaVersion := "2.11.8",
libraryDependencies += spark,
libraryDependencies += sparksql
  )


Regards,
Raghava.


On Fri, May 13, 2016 at 12:23 AM, Luciano Resende 
wrote:

> Spark has moved to build using Scala 2.11 by default in master/trunk.
>
> As for the 2.0.0-SNAPSHOT, it is actually the version of master/trunk and
> you might be missing some modules/profiles for your build. What command did
> you use to build ?
>
> On Thu, May 12, 2016 at 9:01 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> I built Spark from the source code available at
>> https://github.com/apache/spark/. Although I haven't specified the
>> "-Dscala-2.11" option (to build with Scala 2.11), from the build messages I
>> see that it ended up using Scala 2.11. Now, for my application sbt, what
>> should be the spark version? I tried the following
>>
>> val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
>> val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"
>>
>> and scalaVersion := "2.11.8"
>>
>> But this setting of spark version gives sbt error
>>
>> unresolved dependency: org.apache.spark#spark-core_2.11;2.0.0-SNAPSHOT
>>
>> I guess this is because the repository doesn't contain 2.0.0-SNAPSHOT.
>> Does this mean, the only option is to put all the required jars in the lib
>> folder (unmanaged dependencies)?
>>
>> Regards,
>> Raghava.
>>
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>



-- 
Regards,
Raghava
http://raghavam.github.io


ANOVA test in Spark

2016-05-13 Thread mayankshete
Is ANOVA present in Spark Mllib if not then, when will be this feature be
available in Spark ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ANOVA-test-in-Spark-tp26949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: S3A Creating Task Per Byte (pyspark / 1.6.1)

2016-05-13 Thread Steve Loughran

On 12 May 2016, at 18:35, Aaron Jackson 
> wrote:

I'm using the spark 1.6.1 (hadoop-2.6) and I'm trying to load a file that's in 
s3.  I've done this previously with spark 1.5 with no issue.  Attempting to 
load and count a single file as follows:

dataFrame = sqlContext.read.text('s3a://bucket/path-to-file.csv')
dataFrame.count()

But when it attempts to load, it creates 279K tasks.  When I look at the tasks, 
the # of tasks is identical to the # of bytes in the file.  Has anyone seen 
anything like this or have any ideas why it's getting that granular?

yeah, seen that. The blocksize being returned by the FS is coming back as 0, 
which is then triggering a split on every byte. Which as you have noticed, 
doesn't work

you've hit https://issues.apache.org/jira/browse/HADOOP-11584 , fixed in Hadoop 
2.7.0

You need to consider S3A not usable in production in the 2.6.0 release; things 
surfaced in the field which only got caught later. HADOOP


 https://issues.apache.org/jira/browse/HADOOP-11571 covered the issues that 
surfaced. Stay on S3N for a 2.6-x based release, run to Hadoop 2.7.1+ for S3A 
to be ready to play.




Re: Kafka partition increased while Spark Streaming is running

2016-05-13 Thread chandan prakash
Follow up question :

 If spark streaming is using checkpointing (/tmp/checkpointDir)  for
AtLeastOnce and  number of Topics or/and partitions has increased  then

 will gracefully shutting down and restarting from checkpoint will consider
new topics or/and partitions ?
 If the answer is NO then how to start from the same checkpoint with new
partitions/topics included?

Thanks,
Chandan


On Wed, Feb 24, 2016 at 9:30 PM, Cody Koeninger  wrote:

> That's correct, when you create a direct stream, you specify the
> topicpartitions you want to be a part of the stream (the other method for
> creating a direct stream is just a convenience wrapper).
>
> On Wed, Feb 24, 2016 at 2:15 AM, 陈宇航  wrote:
>
>> Here I use the *'KafkaUtils.createDirectStream'* to integrate Kafka with
>> Spark Streaming. I submitted the app, then I changed (increased) Kafka's
>> partition number after it's running for a while. Then I check the input
>> offset with '*rdd.asInstanceOf[HasOffsetRanges].offsetRanges*', seeing
>> that only the offset of the initial partitions are returned.
>>
>> Does this mean Spark Streaming's Kafka integration can't update its
>> parallelism when Kafka's partition number is changed?
>>
>
>


-- 
Chandan Prakash


Re: High virtual memory consumption on spark-submit client.

2016-05-13 Thread jone
no, i have set master to yarn-cluster.
when the sparkpi.running,the result of  free -t as follow
[running]mqq@10.205.3.29:/data/home/hive/conf$ free -t
 total   used   free shared    buffers cached
Mem:  32740732   32105684 635048  0 683332   28863456
-/+ buffers/cache:    2558896   30181836
Swap:  2088952  60320    2028632
Total:    34829684   32166004    2663680
after sparkpi succes,the result as follow
[running]mqq@10.205.3.29:/data/home/hive/conf$ free -t
 total   used   free shared    buffers cached
Mem:  32740732   31614452    1126280  0 683624   28863096
-/+ buffers/cache:    2067732   30673000
Swap:  2088952  60320    2028632
Total:    34829684   31674772    3154912
Mich Talebzadeh 
于 2016年5月13日,14:47写道:Is this a standalone set up single host where executor runs inside the driver?also runfree -tTo see the virtual memory usage which is basically swap spacefree -t total   used   free shared    buffers cachedMem:  24546308   24268760 277548  0    1088236   15168668-/+ buffers/cache:    8011856   16534452Swap:  2031608    304    2031304Total:    26577916   24269064    2308852

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com

 


On 13 May 2016 at 07:36, Jone Zhang  wrote:mich, Do you want this
==
[running]mqq@10.205.3.29:/data/home/hive/conf$ ps aux | grep SparkPi
mqq      20070  3.6  0.8 10445048 267028 pts/16 Sl+ 13:09   0:11
/data/home/jdk/bin/java
-Dlog4j.configuration=file:///data/home/spark/conf/log4j.properties
-cp /data/home/spark/lib/*:/data/home/hadoop/share/hadoop/common/*:/data/home/hadoop/share/hadoop/common/lib/*:/data/home/hadoop/share/hadoop/yarn/*:/data/home/hadoop/share/hadoop/yarn/lib/*:/data/home/hadoop/share/hadoop/hdfs/*:/data/home/hadoop/share/hadoop/hdfs/lib/*:/data/home/hadoop/share/hadoop/tools/*:/data/home/hadoop/share/hadoop/mapreduce/*:/data/home/spark/conf/:/data/home/spark/lib/spark-assembly-1.4.1-hadoop2.5.1_150903.jar:/data/home/spark/lib/datanucleus-api-jdo-3.2.6.jar:/data/home/spark/lib/datanucleus-core-3.2.10.jar:/data/home/spark/lib/datanucleus-rdbms-3.2.9.jar:/data/home/hadoop/conf/:/data/home/hadoop/conf/:/data/home/spark/lib/*:/data/home/hadoop/share/hadoop/common/*:/data/home/hadoop/share/hadoop/common/lib/*:/data/home/hadoop/share/hadoop/yarn/*:/data/home/hadoop/share/hadoop/yarn/lib/*:/data/home/hadoop/share/hadoop/hdfs/*:/data/home/hadoop/share/hadoop/hdfs/lib/*:/data/home/hadoop/share/hadoop/tools/*:/data/home/hadoop/share/hadoop/mapreduce/*
-XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master
yarn-cluster --class org.apache.spark.examples.SparkPi --queue spark
--num-executors 4
/data/home/spark/lib/spark-examples-1.4.1-hadoop2.5.1.jar 1
mqq      22410  0.0  0.0 110600  1004 pts/8    S+   13:14   0:00 grep SparkPi
[running]mqq@10.205.3.29:/data/home/hive/conf$ top -p 20070

top - 13:14:48 up 504 days, 19:17, 19 users,  load average: 1.41, 1.10, 0.99
Tasks:   1 total,   0 running,   1 sleeping,   0 stopped,   0 zombie
Cpu(s): 18.1%us,  2.7%sy,  0.0%ni, 74.4%id,  4.5%wa,  0.0%hi,  0.2%si,  0.0%st
Mem:  32740732k total, 31606288k used,  113k free,   475908k buffers
Swap:  2088952k total,    61076k used,  2027876k free, 27594452k cached

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
20070 mqq       20   0 10.0g 260m  32m S  0.0  0.8   0:11.38 java
==

Harsh, physical cpu cores is 1, virtual cpu cores is 4

Thanks.

2016-05-13 13:08 GMT+08:00, Harsh J :
> How many CPU cores are on that machine? Read http://qr.ae/8Uv3Xq
>
> You can also confirm the above by running the pmap utility on your process
> and most of the virtual memory would be under 'anon'.
>
> On Fri, 13 May 2016 09:11 jone,  wrote:
>
>> The virtual memory is 9G When i run org.apache.spark.examples.SparkPi
>> under yarn-cluster model,which using default configurations.
>>   PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+
>> COMMAND
>>
>> 4519 mqq       20   0 9041 <2009041>m 248m  26m S  0.3  0.8   0:19.85
>> java
>>  I am curious why is so high?
>>
>> Thanks.
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




The metastore database gives errors when start spark-sql CLI.

2016-05-13 Thread Joseph

Hi all,

I use PostgreSQL to store the hive metadata. 

First, I imported a sql script to metastore database as follows:
psql -U postgres -d metastore -h 192.168.50.30 -f 
hive-schema-1.2.0.postgres.sql

Then, when I started $SPARK_HOME/bin/spark-sql,  the PostgreSQL  gave the 
following errors:

ERROR:  syntax error at or near "@@" at character 5
STATEMENT:  SET @@session.sql_mode=ANSI_QUOTES
ERROR:  relation "v$instance" does not exist at character 21
STATEMENT:  SELECT version FROM v$instance
ERROR:  column "version" does not exist at character 10
STATEMENT:  SELECT @@version

This does not affect normal use, but maybe it is a bug! ( I use spark 1.6.1 and 
 hive 1.2.1)



Joseph


Re: Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread Sean Owen
The problem is there's no Java-friendly version of this, and the Scala
API return type actually has no analog in Java (an array of any type,
not just of objects) so it becomes Object. You can just cast it to the
type you know it will be -- RDD or RDD or whatever.

On Fri, May 13, 2016 at 8:40 AM, tgodden  wrote:
> Hello,
>
> We're trying to use PrefixSpan on sequential data, by passing a sliding
> window over it. Spark Streaming is not an option.
> RDDFunctions.sliding() returns an item of class RDD,
> regardless of the original type of the RDD. Because of this, the
> returned item seems to be pretty much worthless.
> Is this a bug/nyi? Is there a way to circumvent this somehow?
>
> Official docs:
> https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html
>
> Thanks
>
> 
> View this message in context: Java: Return type of RDDFunctions.sliding(int,
> int)
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Java: Return type of RDDFunctions.sliding(int, int)

2016-05-13 Thread tgodden
Hello,

We're trying to use PrefixSpan on sequential data, by passing a sliding
window over it. Spark Streaming is not an option.
RDDFunctions.sliding() returns an item of class RDD,
regardless of the original type of the RDD. Because of this, the
returned item seems to be pretty much worthless.
Is this a bug/nyi? Is there a way to circumvent this somehow?

Official docs:
https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-Return-type-of-RDDFunctions-sliding-int-int-tp26948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

When start spark-sql, postgresql gives errors.

2016-05-13 Thread Joseph

Hi all,

I use PostgreSQL to store the hive metadata. 

First, I imported a sql script to metastore database as follows:
psql -U postgres -d metastore -h 192.168.50.30 -f 
hive-schema-1.2.0.postgres.sql

Then, when I started $SPARK_HOME/bin/spark-sql,  the PostgreSQL  gave the 
following errors:

ERROR:  syntax error at or near "@@" at character 5
STATEMENT:  SET @@session.sql_mode=ANSI_QUOTES
ERROR:  relation "v$instance" does not exist at character 21
STATEMENT:  SELECT version FROM v$instance
ERROR:  column "version" does not exist at character 10
STATEMENT:  SELECT @@version

This does not affect normal use, but maybe it is a bug! ( I use spark 1.6.1 and 
 hive 1.2.1)



Joseph


Re: Spark handling spill overs

2016-05-13 Thread Mich Talebzadeh
Spill-overs are a common issue for in-memory computing systems, after all
memory is limited. In Spark where RDDs are immutable, if an RDD got created
with its size > 1/2 node's RAM then a transformation and generation of the
consequent RDD' can potentially fill all the node's memory that can  cause
the spill-over into swap space.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 13 May 2016 at 00:38, Takeshi Yamamuro  wrote:

> Hi,
>
> Which version of Spark you use?
> The recent one cannot handle this kind of spilling, see:
> http://spark.apache.org/docs/latest/tuning.html#memory-management-overview
> .
>
> // maropu
>
> On Fri, May 13, 2016 at 8:07 AM, Ashok Kumar  > wrote:
>
>> Hi,
>>
>> How one can avoid having Spark spill over after filling the node's memory.
>>
>> Thanks
>>
>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Graceful shutdown of spark streaming on yarn

2016-05-13 Thread Deepak Sharma
Rakesh
Have you used awaitTermination() on your ssc ?
If not , dd this and see if it changes the behavior.
I am guessing this issue may be related to yarn deployment mode.
Also try setting the deployment mode to yarn-client.

Thanks
Deepak


On Fri, May 13, 2016 at 10:17 AM, Rakesh H (Marketing Platform-BLR) <
rakes...@flipkart.com> wrote:

> Ping!!
> Has anybody tested graceful shutdown of a spark streaming in yarn-cluster
> mode?It looks like a defect to me.
>
>
> On Thu, May 12, 2016 at 12:53 PM Rakesh H (Marketing Platform-BLR) <
> rakes...@flipkart.com> wrote:
>
>> We are on spark 1.5.1
>> Above change was to add a shutdown hook.
>> I am not adding shutdown hook in code, so inbuilt shutdown hook is being
>> called.
>> Driver signals that it is going to to graceful shutdown, but executor
>> sees that Driver is dead and it shuts down abruptly.
>> Could this issue be related to yarn? I see correct behavior locally. I
>> did "yarn kill " to kill the job.
>>
>>
>> On Thu, May 12, 2016 at 12:28 PM Deepak Sharma 
>> wrote:
>>
>>> This is happening because spark context shuts down without shutting down
>>> the ssc first.
>>> This was behavior till spark 1.4 ans was addressed in later releases.
>>> https://github.com/apache/spark/pull/6307
>>>
>>> Which version of spark are you on?
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Thu, May 12, 2016 at 12:14 PM, Rakesh H (Marketing Platform-BLR) <
>>> rakes...@flipkart.com> wrote:
>>>
 Yes, it seems to be the case.
 In this case executors should have continued logging values till 300,
 but they are shutdown as soon as i do "yarn kill .."

 On Thu, May 12, 2016 at 12:11 PM Deepak Sharma 
 wrote:

> So in your case , the driver is shutting down gracefully , but the
> executors are not.
> IS this the problem?
>
> Thanks
> Deepak
>
> On Thu, May 12, 2016 at 11:49 AM, Rakesh H (Marketing Platform-BLR) <
> rakes...@flipkart.com> wrote:
>
>> Yes, it is set to true.
>> Log of driver :
>>
>> 16/05/12 10:18:29 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: 
>> SIGTERM
>> 16/05/12 10:18:29 INFO streaming.StreamingContext: Invoking 
>> stop(stopGracefully=true) from shutdown hook
>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Stopping JobGenerator 
>> gracefully
>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Waiting for all received 
>> blocks to be consumed for job generation
>> 16/05/12 10:18:29 INFO scheduler.JobGenerator: Waited for all received 
>> blocks to be consumed for job generation
>>
>> Log of executor:
>> 16/05/12 10:18:29 ERROR executor.CoarseGrainedExecutorBackend: Driver 
>> xx.xx.xx.xx:x disassociated! Shutting down.
>> 16/05/12 10:18:29 WARN remote.ReliableDeliverySupervisor: Association 
>> with remote system [xx.xx.xx.xx:x] has failed, address is now gated 
>> for [5000] ms. Reason: [Disassociated]
>> 16/05/12 10:18:29 INFO storage.DiskBlockManager: Shutdown hook called
>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE -> 
>> 204 //This is value i am logging
>> 16/05/12 10:18:29 INFO util.ShutdownHookManager: Shutdown hook called
>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE -> 
>> 205
>> 16/05/12 10:18:29 INFO processors.StreamJobRunner$: VALUE -> 
>> 206
>>
>>
>>
>>
>>
>>
>> On Thu, May 12, 2016 at 11:45 AM Deepak Sharma 
>> wrote:
>>
>>> Hi Rakesh
>>> Did you tried setting *spark.streaming.stopGracefullyOnShutdown to
>>> true *for your spark configuration instance?
>>> If not try this , and let us know if this helps.
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Thu, May 12, 2016 at 11:42 AM, Rakesh H (Marketing Platform-BLR)
>>>  wrote:
>>>
 Issue i am having is similar to the one mentioned here :

 http://stackoverflow.com/questions/36911442/how-to-stop-gracefully-a-spark-streaming-application-on-yarn

 I am creating a rdd from sequence of 1 to 300 and creating
 streaming RDD out of it.

 val rdd = ssc.sparkContext.parallelize(1 to 300)
 val dstream = new ConstantInputDStream(ssc, rdd)
 dstream.foreachRDD{ rdd =>
   rdd.foreach{ x =>
 log(x)
 Thread.sleep(50)
   }
 }


 When i kill this job, i expect elements 1 to 300 to be logged
 before shutting down. It is indeed the case when i run it locally. It 
 wait
 for the job to finish before shutting down.

 But when i launch the job in custer with "yarn-cluster" mode, it
 abruptly shuts down.
 Executor prints following log

 ERROR executor.CoarseGrainedExecutorBackend: