Re: Need Help in Spark Hive Data Processing

2016-01-06 Thread Jeff Zhang
It depends on how you fetch the single row. Does your query complex ?

On Thu, Jan 7, 2016 at 12:47 PM, Balaraju.Kagidala Kagidala <
balaraju.kagid...@gmail.com> wrote:

> Hi ,
>
>   I am new user to spark. I am trying to use Spark to process huge Hive
> data using Spark DataFrames.
>
>
> I have 5 node Spark cluster each with 30 GB memory. i am want to process
> hive table with 450GB data using DataFrames. To fetch single row from Hive
> table its taking 36 mins. Pls suggest me what wrong here and any help is
> appreciated.
>
>
> Thanks
> Bala
>
>
>


-- 
Best Regards

Jeff Zhang


Can spark.scheduler.pool be applied globally ?

2016-01-05 Thread Jeff Zhang
It seems currently spark.scheduler.pool must be set as localProperties
(associate with thread). Any reason why spark.scheduler.pool can not be
used globally.  My scenario is that I want my thriftserver started with
fair scheduler as the default pool without using set command to set the
pool. Is there anyway to do that ? Or do I miss anything here ?

-- 
Best Regards

Jeff Zhang


Re: Can spark.scheduler.pool be applied globally ?

2016-01-05 Thread Jeff Zhang
Thanks Mark, custom configuration file would be better for me. Changing
code will make it affect all the applications, this is too risky for me.



On Wed, Jan 6, 2016 at 10:50 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> The other way to do it is to build a custom version of Spark where you
> have changed the value of DEFAULT_SCHEDULING_MODE -- and if you were
> paying close attention, I accidentally let it slip that that is what I've
> done.  I previously wrote "schedulingMode = DEFAULT_SCHEDULING_MODE --
> i.e. SchedulingMode.FAIR", but that should actually be SchedulingMode.FIFO
> if you haven't changed the code:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L65
>
> On Tue, Jan 5, 2016 at 5:29 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Right, I can override the root pool in configuration file, Thanks Mark.
>>
>> On Wed, Jan 6, 2016 at 8:45 AM, Mark Hamstra <m...@clearstorydata.com>
>> wrote:
>>
>>> Just configure  with
>>> FAIR in fairscheduler.xml (or
>>> in spark.scheduler.allocation.file if you have over-riden the default name
>>> for the config file.)  `buildDefaultPool()` will only build the pool named
>>> "default" with the default properties (such as schedulingMode =
>>> DEFAULT_SCHEDULING_MODE -- i.e. SchedulingMode.FAIR) if that pool name is
>>> not already built (
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L90
>>> ).
>>>
>>>
>>> On Tue, Jan 5, 2016 at 4:15 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> Sorry, I don't make it clearly. What I want is the default pool is fair
>>>> scheduling. But seems if I want to use fair scheduling now, I have to set
>>>> spark.scheduler.pool explicitly.
>>>>
>>>> On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com>
>>>> wrote:
>>>>
>>>>> I don't understand.  If you're using fair scheduling and don't set a
>>>>> pool, the default pool will be used.
>>>>>
>>>>> On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> It seems currently spark.scheduler.pool must be set as
>>>>>> localProperties (associate with thread). Any reason why
>>>>>> spark.scheduler.pool can not be used globally.  My scenario is that I 
>>>>>> want
>>>>>> my thriftserver started with fair scheduler as the default pool without
>>>>>> using set command to set the pool. Is there anyway to do that ? Or do I
>>>>>> miss anything here ?
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: Can spark.scheduler.pool be applied globally ?

2016-01-05 Thread Jeff Zhang
Right, I can override the root pool in configuration file, Thanks Mark.

On Wed, Jan 6, 2016 at 8:45 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> Just configure  with
> FAIR in fairscheduler.xml (or
> in spark.scheduler.allocation.file if you have over-riden the default name
> for the config file.)  `buildDefaultPool()` will only build the pool named
> "default" with the default properties (such as schedulingMode =
> DEFAULT_SCHEDULING_MODE -- i.e. SchedulingMode.FAIR) if that pool name is
> not already built (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala#L90
> ).
>
>
> On Tue, Jan 5, 2016 at 4:15 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Sorry, I don't make it clearly. What I want is the default pool is fair
>> scheduling. But seems if I want to use fair scheduling now, I have to set
>> spark.scheduler.pool explicitly.
>>
>> On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com>
>> wrote:
>>
>>> I don't understand.  If you're using fair scheduling and don't set a
>>> pool, the default pool will be used.
>>>
>>> On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>>
>>>> It seems currently spark.scheduler.pool must be set as localProperties
>>>> (associate with thread). Any reason why spark.scheduler.pool can not be
>>>> used globally.  My scenario is that I want my thriftserver started with
>>>> fair scheduler as the default pool without using set command to set the
>>>> pool. Is there anyway to do that ? Or do I miss anything here ?
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread Jeff Zhang
>>>>>> On Tue, Jan 5, 2016 at 5:19 PM, Nicholas Chammas <
>>>>>>>>>> nicholas.cham...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> As I pointed out in my earlier email, RHEL will support Python
>>>>>>>>>>> 2.6 until 2020. So I'm assuming these large companies will have the 
>>>>>>>>>>> option
>>>>>>>>>>> of riding out Python 2.6 until then.
>>>>>>>>>>>
>>>>>>>>>>> Are we seriously saying that Spark should likewise support
>>>>>>>>>>> Python 2.6 for the next several years? Even though the core Python 
>>>>>>>>>>> devs
>>>>>>>>>>> stopped supporting it in 2013?
>>>>>>>>>>>
>>>>>>>>>>> If that's not what we're suggesting, then when, roughly, can we
>>>>>>>>>>> drop support? What are the criteria?
>>>>>>>>>>>
>>>>>>>>>>> I understand the practical concern here. If companies are stuck
>>>>>>>>>>> using 2.6, it doesn't matter to them that it is deprecated. But 
>>>>>>>>>>> balancing
>>>>>>>>>>> that concern against the maintenance burden on this project, I 
>>>>>>>>>>> would say
>>>>>>>>>>> that "upgrade to Python 2.7 or stay on Spark 1.6.x" is a reasonable
>>>>>>>>>>> position to take. There are many tiny annoyances one has to put up 
>>>>>>>>>>> with to
>>>>>>>>>>> support 2.6.
>>>>>>>>>>>
>>>>>>>>>>> I suppose if our main PySpark contributors are fine putting up
>>>>>>>>>>> with those annoyances, then maybe we don't need to drop support 
>>>>>>>>>>> just yet...
>>>>>>>>>>>
>>>>>>>>>>> Nick
>>>>>>>>>>> 2016년 1월 5일 (화) 오후 2:27, Julio Antonio Soto de Vicente <
>>>>>>>>>>> ju...@esbet.es>님이 작성:
>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, Koert is right.
>>>>>>>>>>>>
>>>>>>>>>>>> I've been in a couple of projects using Spark (banking
>>>>>>>>>>>> industry) where CentOS + Python 2.6 is the toolbox available.
>>>>>>>>>>>>
>>>>>>>>>>>> That said, I believe it should not be a concern for Spark.
>>>>>>>>>>>> Python 2.6 is old and busted, which is totally opposite to the 
>>>>>>>>>>>> Spark
>>>>>>>>>>>> philosophy IMO.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> El 5 ene 2016, a las 20:07, Koert Kuipers <ko...@tresata.com>
>>>>>>>>>>>> escribió:
>>>>>>>>>>>>
>>>>>>>>>>>> rhel/centos 6 ships with python 2.6, doesnt it?
>>>>>>>>>>>>
>>>>>>>>>>>> if so, i still know plenty of large companies where python 2.6
>>>>>>>>>>>> is the only option. asking them for python 2.7 is not going to work
>>>>>>>>>>>>
>>>>>>>>>>>> so i think its a bad idea
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jan 5, 2016 at 1:52 PM, Juliet Hougland <
>>>>>>>>>>>> juliet.hougl...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't see a reason Spark 2.0 would need to support Python
>>>>>>>>>>>>> 2.6. At this point, Python 3 should be the default that is 
>>>>>>>>>>>>> encouraged.
>>>>>>>>>>>>> Most organizations acknowledge the 2.7 is common, but lagging
>>>>>>>>>>>>> behind the version they should theoretically use. Dropping python 
>>>>>>>>>>>>> 2.6
>>>>>>>>>>>>> support sounds very reasonable to me.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jan 5, 2016 at 5:45 AM, Nicholas Chammas <
>>>>>>>>>>>>> nicholas.cham...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Red Hat supports Python 2.6 on REHL 5 until 2020
>>>>>>>>>>>>>> <https://alexgaynor.net/2015/mar/30/red-hat-open-source-community/>,
>>>>>>>>>>>>>> but otherwise yes, Python 2.6 is ancient history and the core 
>>>>>>>>>>>>>> Python
>>>>>>>>>>>>>> developers stopped supporting it in 2013. REHL 5 is not a good 
>>>>>>>>>>>>>> enough
>>>>>>>>>>>>>> reason to continue support for Python 2.6 IMO.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We should aim to support Python 2.7 and Python 3.3+ (which I
>>>>>>>>>>>>>> believe we currently do).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Nick
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jan 5, 2016 at 8:01 AM Allen Zhang <
>>>>>>>>>>>>>> allenzhang...@126.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> plus 1,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> we are currently using python 2.7.2 in production
>>>>>>>>>>>>>>> environment.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 在 2016-01-05 18:11:45,"Meethu Mathew" <
>>>>>>>>>>>>>>> meethu.mat...@flytxt.com> 写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>> We use Python 2.7
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Meethu Mathew
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jan 5, 2016 at 12:47 PM, Reynold Xin <
>>>>>>>>>>>>>>> r...@databricks.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Does anybody here care about us dropping support for Python
>>>>>>>>>>>>>>>> 2.6 in Spark 2.0?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Python 2.6 is ancient, and is pretty slow in many aspects
>>>>>>>>>>>>>>>> (e.g. json parsing) when compared with Python 2.7. Some 
>>>>>>>>>>>>>>>> libraries that
>>>>>>>>>>>>>>>> Spark depend on stopped supporting 2.6. We can still convince 
>>>>>>>>>>>>>>>> the library
>>>>>>>>>>>>>>>> maintainers to support 2.6, but it will be extra work. I'm 
>>>>>>>>>>>>>>>> curious if
>>>>>>>>>>>>>>>> anybody still uses Python 2.6 to run Spark.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>
>


-- 
Best Regards

Jeff Zhang


Re: Can spark.scheduler.pool be applied globally ?

2016-01-05 Thread Jeff Zhang
Sorry, I don't make it clearly. What I want is the default pool is fair
scheduling. But seems if I want to use fair scheduling now, I have to set
spark.scheduler.pool explicitly.

On Wed, Jan 6, 2016 at 2:03 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> I don't understand.  If you're using fair scheduling and don't set a pool,
> the default pool will be used.
>
> On Tue, Jan 5, 2016 at 1:57 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>>
>> It seems currently spark.scheduler.pool must be set as localProperties
>> (associate with thread). Any reason why spark.scheduler.pool can not be
>> used globally.  My scenario is that I want my thriftserver started with
>> fair scheduler as the default pool without using set command to set the
>> pool. Is there anyway to do that ? Or do I miss anything here ?
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: sql:Exception in thread "main" scala.MatchError: StringType

2016-01-03 Thread Jeff Zhang
Spark only support one json object per line. You need to reformat your
file.

On Mon, Jan 4, 2016 at 11:26 AM, Bonsen <hengbohe...@126.com> wrote:

> (sbt) scala:
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql
> object SimpleApp {
>   def main(args: Array[String]) {
> val conf = new SparkConf()
> conf.setAppName("mytest").setMaster("spark://Master:7077")
> val sc = new SparkContext(conf)
> val sqlContext = new sql.SQLContext(sc)
> val
>
> d=sqlContext.read.json("/home/hadoop/2015data_test/Data/Data/100808cb11e9898816ef15fcdde4e1d74cbc0b/Db6Jh2XeQ.json")
> sc.stop()
>   }
> }
>
> __
> after sbt package :
> ./spark-submit --class "SimpleApp"
>
> /home/hadoop/Downloads/sbt/bin/target/scala-2.10/simple-project_2.10-1.0.jar
>
> ___
> json fIle:
> {
> "programmers": [
> {
> "firstName": "Brett",
> "lastName": "McLaughlin",
> "email": ""
> },
> {
> "firstName": "Jason",
> "lastName": "Hunter",
> "email": ""
> },
> {
> "firstName": "Elliotte",
> "lastName": "Harold",
> "email": ""
> }
> ],
> "authors": [
> {
> "firstName": "Isaac",
> "lastName": "Asimov",
> "genre": "sciencefiction"
> },
> {
> "firstName": "Tad",
> "lastName": "Williams",
> "genre": "fantasy"
> },
> {
> "firstName": "Frank",
> "lastName": "Peretti",
> "genre": "christianfiction"
> }
> ],
> "musicians": [
> {
> "firstName": "Eric",
> "lastName": "Clapton",
> "instrument": "guitar"
> },
> {
> "firstName": "Sergei",
> "lastName": "Rachmaninoff",
> "instrument": "piano"
> }
> ]
> }
>
> ___
> Exception in thread "main" scala.MatchError: StringType (of class
> org.apache.spark.sql.types.StringType$)
> at
> org.apache.spark.sql.json.InferSchema$.apply(InferSchema.scala:58)
> at
>
> org.apache.spark.sql.json.JSONRelation$$anonfun$schema$1.apply(JSONRelation.scala:139)
>
> ___
> why
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/sql-Exception-in-thread-main-scala-MatchError-StringType-tp25868.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Cannot get repartitioning to work

2016-01-01 Thread Jeff Zhang
You are using the wrong RDD, use the returned RDD as following.

val repartitionedRDD = results.repartition(20)
println(repartitionedRDD.partitions.size)

On Sat, Jan 2, 2016 at 10:38 AM, jimitkr <ji...@softpath.net> wrote:

> Hi,
>
> I'm trying to test some custom parallelism and repartitioning in spark.
>
> First, i reduce my RDD (forcing creation of 10 partitions for the same).
>
> I then repartition the data to 20 partitions and print out the number of
> partitions, but i always get 10. Looks like the repartition command is
> getting ignored.
>
> How do i get repartitioning to work? See code below:
>
>   val
> results=input.reduceByKey((x,y)=>x+y,10).persist(StorageLevel.DISK_ONLY)
> results.repartition(20)
> println(results.partitions.size)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-get-repartitioning-to-work-tp25852.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Is there anyway to log properties from a Spark application

2015-12-28 Thread Jeff Zhang
set spark.logConf as true in spark-default.conf will log the property in
driver side. But it would only log the property you set, not including the
properties with default value.


On Mon, Dec 28, 2015 at 8:18 PM, alvarobrandon <alvarobran...@gmail.com>
wrote:

> Hello:
>
> I was wondering if its possible to log properties from Spark Applications
> like spark.yarn.am.memory, spark.driver.cores,
> spark.reducer.maxSizeInFlight
> without having to access the SparkConf object programmatically. I'm trying
> to find some kind of log file that has traces of the execution of Spark
> apps
> and its parameters.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-anyway-to-log-properties-from-a-Spark-application-tp25820.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Is there anyway to log properties from a Spark application

2015-12-28 Thread Jeff Zhang
If you run it as yarn-client mode, it will be client side log. If it is
yarn-cluster mode, it will be logged in the AM container (the first
container)


On Mon, Dec 28, 2015 at 8:30 PM, Alvaro Brandon <alvarobran...@gmail.com>
wrote:

> Thanks for the swift response.
>
> I'm launching my applications through YARN. Where will these properties be
> logged?. I guess they wont be part of YARN logs
>
> 2015-12-28 13:22 GMT+01:00 Jeff Zhang <zjf...@gmail.com>:
>
>> set spark.logConf as true in spark-default.conf will log the property in
>> driver side. But it would only log the property you set, not including the
>> properties with default value.
>>
>>
>> On Mon, Dec 28, 2015 at 8:18 PM, alvarobrandon <alvarobran...@gmail.com>
>> wrote:
>>
>>> Hello:
>>>
>>> I was wondering if its possible to log properties from Spark Applications
>>> like spark.yarn.am.memory, spark.driver.cores,
>>> spark.reducer.maxSizeInFlight
>>> without having to access the SparkConf object programmatically. I'm
>>> trying
>>> to find some kind of log file that has traces of the execution of Spark
>>> apps
>>> and its parameters.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-anyway-to-log-properties-from-a-Spark-application-tp25820.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -----
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: Passing parameters to spark SQL

2015-12-27 Thread Jeff Zhang
You can do it using scala string interpolation

http://docs.scala-lang.org/overviews/core/string-interpolation.html

On Mon, Dec 28, 2015 at 5:11 AM, Ajaxx <ajack...@pobox.com> wrote:

> Given a SQLContext (or HiveContext) is it possible to pass in parameters
> to a
> query.  There are several reasons why this makes sense, including loss of
> data type during conversion to string, SQL injection, etc.
>
> But currently, it appears that SQLContext.sql() only takes a single
> parameter which is a string.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Opening Dynamic Scaling Executors on Yarn

2015-12-27 Thread Jeff Zhang
See
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation



On Mon, Dec 28, 2015 at 2:00 PM, 顾亮亮 <guliangli...@qiyi.com> wrote:

> Hi all,
>
>
>
> SPARK-3174 (https://issues.apache.org/jira/browse/SPARK-3174) is a useful
> feature to save resources on yarn.
>
> We want to open this feature on our yarn cluster.
>
> I have a question about the version of shuffle service.
>
>
>
> I’m now using spark-1.5.1 (shuffle service).
>
> If I want to upgrade to spark-1.6.0, should I replace the shuffle service
> jar and restart all the namenode on yarn ?
>
>
>
> Thanks a lot.
>
>
>
> Mars
>
>
>



-- 
Best Regards

Jeff Zhang


Re: Can anyone explain Spark behavior for below? Kudos in Advance

2015-12-27 Thread Jeff Zhang
Not sure what you try to do, but the result is correct.

Scenario 2:

Partition 1 ("12", "23")
("","12") => "0"
("0","23") => "1"

Partition 2 ("","345")
("","") => "0"
("0","345") => "1"

Final merge:
("1","1") => "11"




On Mon, Dec 28, 2015 at 7:14 AM, Prem Spark <sparksure...@gmail.com> wrote:

> Scenario1:
> val z = sc.parallelize(List("12","23","345",""),2)
> z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
> + y)
> res143: String = 10
>
> Scenario2:
> val z = sc.parallelize(List("12","23","","345"),2)
> z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x
> + y)
> res144: String = 11
>
> why the result is different . I was expecting 10 for both. also for the
> first Partition
>



-- 
Best Regards

Jeff Zhang


Re: should I file a bug? Re: trouble implementing Transformer and calling DataFrame.withColumn()

2015-12-22 Thread Jeff Zhang
ing df.withColumn()");
>
> transformerdDF.printSchema();
>
> logger.info("show() after calling df.withColumn()");
>
> transformerdDF.show();
>
>
> logger.info("END");
>
> }
>
>
> DataFrame createData() {
>
> Features f1 = new Features(1, category1);
>
> Features f2 = new Features(2, category2);
>
> ArrayList data = new ArrayList(2);
>
> data.add(f1);
>
> data.add(f2);
>
> //JavaRDD rdd =
> javaSparkContext.parallelize(Arrays.asList(f1, f2)); // does not work
>
> JavaRDD rdd = javaSparkContext.parallelize(data);
>
> DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
>
> return df;
>
> }
>
>
> class MyUDF implements UDF1<String, String> {
>
> @Override
>
> public String call(String s) throws Exception {
>
> logger.info("AEDWIP s:{}", s);
>
> String ret = s.equalsIgnoreCase(category1) ?  category1 :
> category3;
>
>     return ret;
>
> }
>
> }
>
>
> public class Features implements Serializable{
>
> private static final long serialVersionUID = 1L;
>
> int id;
>
> String labelStr;
>
>
> Features(int id, String l) {
>
> this.id = id;
>
> this.labelStr = l;
>
> }
>
>
> public int getId() {
>
> return id;
>
> }
>
>
> public void setId(int id) {
>
> this.id = id;
>
> }
>
>
> public String getLabelStr() {
>
> return labelStr;
>
> }
>
>
> public void setLabelStr(String labelStr) {
>
> this.labelStr = labelStr;
>
> }
>
> }
>
>
>
> From: Andrew Davidson <a...@santacruzintegration.com>
> Date: Monday, December 21, 2015 at 7:47 PM
> To: Jeff Zhang <zjf...@gmail.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: trouble implementing Transformer and calling
> DataFrame.withColumn()
>
> Hi Jeff
>
> I took a look at Tokenizer.cal, UnaryTransformer.scala, and
> Transformer.scala.  How ever I can not figure out how implement 
> createTransformFunc()
> in Java 8.
>
> It would be nice to be able to use this transformer in my pipe line but
> not required. The real problem is I can not figure out how to create a
> Column I can pass to dataFrame.withColumn() in my Java code. Here is my
> original python
>
> binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else 
> “signal", StringType())
> ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
>
>
> Any suggestions would be greatly appreciated.
>
> Andy
>
> public class LabelToBinaryTransformer
>
> extends UnaryTransformer<String, String,
> LabelToBinaryTransformer> {
>
> private static final long serialVersionUID = 4202800448830968904L;
>
> private  final UUID uid = UUID.randomUUID();
>
>
> @Override
>
> public String uid() {
>
> return uid.toString();
>
> }
>
>
> @Override
>
> public Function1<String, String> createTransformFunc() {
>
> // original python code
>
> // binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else
> “signal", StringType())
>
> Function1 interface is not easy to implement lots of functions
>
> ???
>
> }
>
>
> @Override
>
> public DataType outputDataType() {
>
> StringType ret = new StringType();
>
> return ret;
>
> }
>
>
>
> }
>
>
> From: Jeff Zhang <zjf...@gmail.com>
> Date: Monday, December 21, 2015 at 6:43 PM
> To: Andrew Davidson <a...@santacruzintegration.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: trouble implementing Transformer and calling
> DataFrame.withColumn()
>
> In your case, I would suggest you to extends UnaryTransformer which is
> much easier.
>
> Yeah, I have to admit that there's no document about how to write a custom
> Transformer, I think we need to add that, since writing custom Transformer
> is a very typical work in machine learning.
>
> On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>>
>> I am trying to port the following python function to Java 8. I would like
>> my java implementation to implement Transform

Re: Missing dependencies when submitting scala app

2015-12-22 Thread Jeff Zhang
It might be jar conflict issue. Spark has dependency org.json4s.jackson, do
you also specify org.json4s.jackson in your sbt dependency but with a
different version ?

On Wed, Dec 23, 2015 at 6:15 AM, Daniel Valdivia <h...@danielvaldivia.com>
wrote:

> Hi,
>
> I'm trying to figure out how to bundle dependendies with a scala
> application, so far my code was tested successfully on the spark-shell
> however now that I'm trying to run it as a stand alone application which
> I'm compilin with sbt is yielding me the error:
>
>
> *java.lang.NoSuchMethodError:
> org.json4s.jackson.JsonMethods$.parse$default$3()Z at
> ClusterIncidents$$anonfun$1.apply(ClusterInciden*
>
> I'm doing "sbt clean package" and then spark-submit of the resulting jar,
> however seems like either my driver or workers don't have the json4s
> dependency, therefor can't find the parse method
>
> Any idea on how to solve this depdendency problem?
>
> thanks in advance
>



-- 
Best Regards

Jeff Zhang


Re: spark-submit for dependent jars

2015-12-21 Thread Jeff Zhang
Put /test/target/spark16-0.0.1-SNAPSHOT.jar as the last argument

./spark-submit --master local  --class test.Main  --jars
/home/user/download/jar/ojdbc7.jar /test/target/spark16-0.0.1-SNAPSHOT.jar

On Mon, Dec 21, 2015 at 9:15 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> How to add dependent jars in spark-submit command. For example: Oracle.
> Could you please help me to resolve this issue
>
> I have a standalone cluster. One Master and One slave.
>
> I have used below command it is not working
>
> ./spark-submit --master local  --class test.Main
> /test/target/spark16-0.0.1-SNAPSHOT.jar --jars
> /home/user/download/jar/ojdbc7.jar
>
> *I'm getting below exception :*
>
> Exception in thread "main" java.sql.SQLException: No suitable driver found
> for jdbc:oracle:thin:@:1521:xxx
> at java.sql.DriverManager.getConnection(DriverManager.java:596)
> at java.sql.DriverManager.getConnection(DriverManager.java:187)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:188)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:181)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:121)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
> at
> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
> at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22)
> at com.cisco.ss.etl.Main$.getData(Main.scala:9)
> at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
> at
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:71)
> at scala.App$$anonfun$main$1.apply(App.scala:71)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
> at scala.App$class.main(App.scala:71)
> at com.cisco.ss.etl.Main$.main(Main.scala:9)
> at com.cisco.ss.etl.Main.main(Main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Regards,
> Rajesh
>



-- 
Best Regards

Jeff Zhang


Re: spark-submit for dependent jars

2015-12-21 Thread Jeff Zhang
ion.(JDBCRelation.scala:91)
>>> at
>>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
>>> at
>>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>>> at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22)
>>> at com.cisco.ss.etl.Main$.getData(Main.scala:9)
>>> at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13)
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>>> at
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>>> at scala.App$class.main(App.scala:71)
>>> at com.cisco.ss.etl.Main$.main(Main.scala:9)
>>> at com.cisco.ss.etl.Main.main(Main.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> Regards,
>>> Rajesh
>>>
>>
>>
>


-- 
Best Regards

Jeff Zhang


Re: trouble implementing Transformer and calling DataFrame.withColumn()

2015-12-21 Thread Jeff Zhang
In your case, I would suggest you to extends UnaryTransformer which is much
easier.

Yeah, I have to admit that there's no document about how to write a custom
Transformer, I think we need to add that, since writing custom Transformer
is a very typical work in machine learning.

On Tue, Dec 22, 2015 at 9:54 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

>
> I am trying to port the following python function to Java 8. I would like
> my java implementation to implement Transformer so I can use it in a
> pipeline.
>
> I am having a heck of a time trying to figure out how to create a Column
> variable I can pass to DataFrame.withColumn(). As far as I know
> withColumn() the only way to append a column to a data frame.
>
> Any comments or suggestions would be greatly appreciated
>
> Andy
>
>
> def convertMultinomialLabelToBinary(dataFrame):
> newColName = "binomialLabel"
> binomial = udf(lambda labelStr: labelStr if (labelStr == "noise") else 
> “signal", StringType())
> ret = dataFrame.withColumn(newColName, binomial(dataFrame["label"]))
> return ret
> trainingDF2 = convertMultinomialLabelToBinary(trainingDF1)
>
>
>
> public class LabelToBinaryTransformer extends Transformer {
>
> private static final long serialVersionUID = 4202800448830968904L;
>
> private  final UUID uid = UUID.randomUUID();
>
> public String inputCol;
>
> public String outputCol;
>
>
>
> @Override
>
> public String uid() {
>
> return uid.toString();
>
> }
>
>
> @Override
>
> public Transformer copy(ParamMap pm) {
>
> Params xx = defaultCopy(pm);
>
> return ???;
>
> }
>
>
> @Override
>
> public DataFrame transform(DataFrame df) {
>
> MyUDF myUDF = new MyUDF(myUDF, null, null);
>
> Column c = df.col(inputCol);
>
> ??? UDF apply does not take a col
>
> Column col = myUDF.apply(df.col(inputCol));
>
> DataFrame ret = df.withColumn(outputCol, col);
>
> return ret;
>
> }
>
>
> @Override
>
> public StructType transformSchema(StructType arg0) {
>
>*??? What is this function supposed to do???*
>
>   ???Is this the type of the new output column
>
> }
>
>
>
> class MyUDF extends UserDefinedFunction {
>
> public MyUDF(Object f, DataType dataType, Seq inputTypes)
> {
>
> super(f, dataType, inputTypes);
>
> ??? Why do I have to implement this constructor ???
>
> ??? What are the arguments ???
>
> }
>
>
>
> @Override
>
> public
>
> Column apply(scala.collection.Seq exprs) {
>
> What do you do with a scala seq?
>
> return ???;
>
> }
>
> }
>
> }
>
>
>


-- 
Best Regards

Jeff Zhang


Re: get parameters of spark-submit

2015-12-21 Thread Jeff Zhang
don't understand your question. These parameter are passed to your program
as args of the main function.

On Mon, Dec 21, 2015 at 9:09 PM, Bonsen <hengbohe...@126.com> wrote:

> 1.I code my scala class and pack.(not input the hdfs files' paths,just use
> the paths from "spark-submit"'s parameters)
> 2.Then,If I input like this:
> ${SPARK_HOME/bin}/spark-submit \
> --master  \
>  \
> hdfs:// \
> hdfs:// \
>
> what should I do to get the two hdfs files' paths in my scala class's
> code(before pack the jar file)?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/get-parameters-of-spark-submit-tp25749.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: DataFrame operations

2015-12-20 Thread Jeff Zhang
If it does not return a column you expect, then what does this return ? Do
you will have 2 columns with the same column name ?

On Sun, Dec 20, 2015 at 7:40 PM, Eran Witkon <eranwit...@gmail.com> wrote:

> Hi,
>
> I am a bit confused with dataframe operations.
> I have a function which takes a string and returns a string
> I want to apply this functions on all rows on a single column in my
> dataframe
>
> I was thinking of the following:
> jsonData.withColumn("computedField",computeString(jsonData("hse")))
>
> BUT jsonData("hse") return a column not the row data
> What am I missing here?
>



-- 
Best Regards

Jeff Zhang


Re: Spark batch getting hung up

2015-12-20 Thread Jeff Zhang
>>> Would the driver not wait till all the stuff related to test1 is
completed before calling test2 as test2 is dependent on test1?
>>> val test1 =RDD1.mapPartitions.()
>>> val test2 = test1.mapPartititions()

On the driver side, actually these 2 lines of code will be executed but the
real computation on the executor won't get executed until you call action
on the rdd (I suppose you call that after these code)

You might need to check on the executor side why the hang issue happens.


On Mon, Dec 21, 2015 at 11:04 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> I see this happens when there is a deadlock situation. The RDD test1 has a
> Couchbase call and it seems to be having threads hanging there. Eventhough
> all the connections are closed I see the threads related to Couchbase
> causing the job to hang for sometime before it gets cleared up.
>
> Would the driver not wait till all the stuff related to test1 is completed
> before calling test2 as test2 is dependent on test1?
>
> val test1 =RDD1.mapPartitions.()
>
> val test2 = test1.mapPartititions()
>
> On Sat, Dec 19, 2015 at 12:24 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> First you need to know where the hang happens (driver or executor),
>> checking log would be helpful
>>
>> On Sat, Dec 19, 2015 at 12:25 AM, SRK <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> My Spark Batch job seems to hung up sometimes for a long time before it
>>> starts the next stage/exits. Basically it happens when it has
>>> mapPartition/foreachPartition in a stage. Any idea as to why this is
>>> happening?
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -----
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: Spark batch getting hung up

2015-12-19 Thread Jeff Zhang
First you need to know where the hang happens (driver or executor),
checking log would be helpful

On Sat, Dec 19, 2015 at 12:25 AM, SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> My Spark Batch job seems to hung up sometimes for a long time before it
> starts the next stage/exits. Basically it happens when it has
> mapPartition/foreachPartition in a stage. Any idea as to why this is
> happening?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-batch-getting-hung-up-tp25735.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Dynamic jar loading

2015-12-19 Thread Jeff Zhang
Actually I would say yes and no.  Yes means the jar will be fetched by
executor and added to classpath, No means it would not be added to
classpath of driver. That means you can not invoke the class in the jar
explicitly. But you can call them indirectly. like following (or if the jar
is only dependency, won't be called directly )

>>> rdd.map(e=>{Class.forName("com.zjffdu.tutorial.spark.java.MyStack");
e}).collect()

On Sat, Dec 19, 2015 at 5:47 AM, Jim Lohse <j...@megalearningllc.com> wrote:

> I am going to say no, but have not actually tested this. Just going on
> this line in the docs:
>
> http://spark.apache.org/docs/latest/configuration.html
>
> spark.driver.extraClassPath (none) Extra classpath entries to prepend to
> the classpath of the driver.
> *Note:* In client mode, this config must not be set through the SparkConf
> directly in your application, because the driver JVM has already started at
> that point. Instead, please set this through the --driver-class-path
> command line option or in your default properties file.
>
>
>
> On 12/17/2015 07:53 AM, amarouni wrote:
>
> Hello guys,
>
> Do you know if the method SparkContext.addJar("file:///...") can be used
> on a running context (an already started spark-shell) ?
> And if so, does it add the jar to the class-path of the Spark workers
> (Yarn containers in case of yarn-client) ?
>
> Thanks,
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


-- 
Best Regards

Jeff Zhang


Re: Base ERROR

2015-12-17 Thread Jeff Zhang
I believe this is hbase issue, you'd better to ask on hbase mail list.



On Fri, Dec 18, 2015 at 9:57 AM, censj <ce...@lotuseed.com> wrote:

> hi,all:
> I wirte data to hbase,but Hbase arise this ERROR,Could you help me?
>
>
> r.KeeperException$SessionExpiredException: KeeperErrorCode = Session
> expired for /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: Possibly transient ZooKeeper,
> quorum=byd0151:2181,byd0150:2181,byd0152:2181,
> exception=org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
> 2015-12-17 21:24:29,854 ERROR [regionserver/byd0157/192.168.0.157:16020]
> zookeeper.RecoverableZooKeeper: ZooKeeper delete failed after 4 attempts
> 2015-12-17 21:24:29,854 WARN  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: Failed deleting my ephemeral node
> org.apache.zookeeper.KeeperException$SessionExpiredException:
> KeeperErrorCode = Session expired for
> /hbase-unsecure/rs/byd0157,16020,1449106975377
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
>at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
>at
> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:179)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1345)
>at
> org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1334)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.deleteMyEphemeralNode(HRegionServer.java:1393)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.run(HRegionServer.java:1076)
>at java.lang.Thread.run(Thread.java:745)
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: stopping server byd0157,16020,1449106975377;
> zookeeper connection closed.
> 2015-12-17 21:24:29,855 INFO  [regionserver/byd0157/192.168.0.157:16020]
> regionserver.HRegionServer: regionserver/byd0157/192.168.0.157:16020
> exiting
> 2015-12-17 21:24:29,858 ERROR [main]
> regionserver.HRegionServerCommandLine: Region server exiting
> java.lang.RuntimeException: HRegionServer Aborted
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.start(HRegionServerCommandLine.java:68)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine.run(HRegionServerCommandLine.java:87)
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>at
> org.apache.hadoop.hbase.util.ServerCommandLine.doMain(ServerCommandLine.java:126)
>at
> org.apache.hadoop.hbase.regionserver.HRegionServer.main(HRegionServer.java:2641)
> 2015-12-17 21:24:29,940 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook starting; hbase.shutdown.hook=true;
> fsShutdownHook=org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer@6de54b40
> 2015-12-17 21:24:29,942 INFO  [Thread-6] regionserver.ShutdownHook:
> Starting fs shutdown hook thread.
> 2015-12-17 21:24:29,953 INFO  [Thread-6] regionserver.ShutdownHook:
> Shutdown hook finished.
>
>
>


-- 
Best Regards

Jeff Zhang


Re: Access row column by field name

2015-12-16 Thread Jeff Zhang
use Row.getAs[String](fieldname)

On Thu, Dec 17, 2015 at 10:58 AM, Daniel Valdivia <h...@danielvaldivia.com>
wrote:

> Hi,
>
> I'm processing the json I have in a text file using DataFrames, however
> right now I'm trying to figure out a way to access a certain value within
> the rows of my data frame if I only know the field name and not the
> respective field position in the schema.
>
> I noticed that row.schema and row.dtypes give me information about the
> auto-generate schema, but I cannot see a straigh forward patch for this,
> I'm trying to create a PairRdd out of this
>
> Is there any easy way to figure out the field position by it's field name
> (the key it had in the json)?
>
> so this
>
> val sqlContext = new SQLContext(sc)
> val rawIncRdd = sc.textFile("
> hdfs://1.2.3.4:8020/user/hadoop/incidents/unstructured/inc-0-500.txt")
>  val df = sqlContext.jsonRDD(rawIncRdd)
> df.foreach(line => println(line.getString(0)))
>
>
> would turn into something like this
>
> val sqlContext = new SQLContext(sc)
> val rawIncRdd = sc.textFile("
> hdfs://1.2.3.4:8020/user/hadoop/incidents/unstructured/inc-0-500.txt")
>  val df = sqlContext.jsonRDD(rawIncRdd)
> df.foreach(line => println(line.getString(*"field_name"*)))
>
> thanks for the advice
>



-- 
Best Regards

Jeff Zhang


Re: hiveContext: storing lookup of partitions

2015-12-16 Thread Jeff Zhang
oh, you are using S3. As I remember,  S3 has performance issue when
processing large amount of files.



On Wed, Dec 16, 2015 at 7:58 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> The HIVE table has very large number of partitions around 365 * 5 * 10 and
> when I say hivemetastore to start running queries on it (the one with
> .count() or .show()) then it takes around 2 hours before the job starts in
> SPARK.
>
> On the pyspark screen I can see that it is parsing the S3 locations for
> these 2 hours.
>
> Regards,
> Gourav
>
> On Wed, Dec 16, 2015 at 3:38 AM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> >>> Currently it takes around 1.5 hours for me just to cache in the
>> partition information and after that I can see that the job gets queued in
>> the SPARK UI.
>> I guess you mean the stage of getting the split info. I suspect it might
>> be your cluster issue (or metadata store), unusually it won't take such
>> long time for splitting.
>>
>> On Wed, Dec 16, 2015 at 8:06 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a HIVE table with few thousand partitions (based on date and
>>> time). It takes a long time to run if for the first time and then
>>> subsequently it is fast.
>>>
>>> Is there a way to store the cache of partition lookups so that every
>>> time I start a new SPARK instance (cannot keep my personal server running
>>> continuously), I can immediately restore back the temptable in hiveContext
>>> without asking it go again and cache the partition lookups?
>>>
>>> Currently it takes around 1.5 hours for me just to cache in the
>>> partition information and after that I can see that the job gets queued in
>>> the SPARK UI.
>>>
>>> Regards,
>>> Gourav
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

2015-12-15 Thread Jeff Zhang
>>> *15/12/16 10:22:01 WARN cluster.YarnScheduler: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources*

That means you don't have resources for your application, please check your
hadoop web ui.

On Wed, Dec 16, 2015 at 10:32 AM, zml张明磊 <mingleizh...@ctrip.com> wrote:

> Yesterday night, I run the jar on my pseudo-distributed mode without WARN
> and ERROR. However, Today, Getting the WARN and directly leading to the
> ERROR below. My computer memory is 8GB and I think it’s not the issue as
> the LOG WARN describe. What ‘s wrong ? The code haven’t change yet. And the
> environment haven’t change too. So Strange. Can anybody help me ? Why …….
>
>
>
> Thanks.
>
> Minglei.
>
>
>
> Here is the submit job script
>
>
>
> /bin/spark-submit --master local[*] --driver-memory 8g --executor-memory
> 8g  --class com.ctrip.ml.client.Client
>  /root/di-ml-tool/target/di-ml-tool-1.0-SNAPSHOT.jar
>
>
>
> Error below
>
> *15/12/16 10:22:01 WARN cluster.YarnScheduler: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient resources*
>
> 15/12/16 10:22:04 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: 10.32.3.21:48311
>
> 15/12/16 10:22:04 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint:
> ApplicationMaster has disassociated: 10.32.3.21:48311
>
> 15/12/16 10:22:04 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkYarnAM@10.32.3.21:48311] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> *15/12/16 10:22:04 ERROR cluster.YarnClientSchedulerBackend: Yarn
> application has already exited with state FINISHED!*
>
>
>
> Exception in thread "main" 15/12/16 10:22:04 INFO
> cluster.YarnClientSchedulerBackend: Shutting down all executors
>
> Exception in thread "Yarn application state monitor"
> org.apache.spark.SparkException: Error asking standalone scheduler to shut
> down executors
>
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
>
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
>
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
>
> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
>
> Caused by: java.lang.InterruptedException
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
> at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
>
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
>
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
> at scala.concurrent.Await$.result(package.scala:107)
>
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>
> at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
>
> at
> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
>
>
>



-- 
Best Regards

Jeff Zhang


Re: Can't create UDF through thriftserver, no error reported

2015-12-15 Thread Jeff Zhang
It should be resolved by this ticket
https://issues.apache.org/jira/browse/SPARK-11191



On Wed, Dec 16, 2015 at 3:14 AM, Antonio Piccolboni <anto...@piccolboni.info
> wrote:

> Hi,
> I am trying to create a UDF using the thiftserver. I followed this example
> <https://gist.github.com/airawat/7461612>, which is originally for hive.
> My understanding is that the thriftserver creates a hivecontext and Hive
> UDFs should be supported. I then sent this query to the thriftserver (I use
> the RJDBC module for R but I doubt any other JDBC client would be any
> different):
>
>
> CREATE TEMPORARY FUNCTION NVL2 AS 'khanolkar.HiveUDFs.NVL2GenericUDF'
>
> I only changed some name wrt  the posted examples, but I think the class
> was found just right because 1)There's no errors in the log or console 2)I
> can generate a class not found error mistyping the class name, and I see it
> in the logs 3) I can use the reflect builtin to invoke a different function
> that I wrote and supplied to spark in the same way (--jars option to
> start-thriftserver)
>
> After this, I can't use the NVL2 function in a query and I can't even do a
>  DESCRIBE query on it,  nor does it list with SHOW FUNCTIONS. I tried both
> 1.5.1 and 1.6.0-rc2 built with thriftserver support for Hadoop 2.6
>
> I know the HiveContext is slightly behind the latest Hive as far as
> features, I believe one or two revs, so that may be one potential problem,
> but all these feature I believe are present in Hive 0.11 and should have
> made it into Spark. At the very least, I would like to see some message in
> the logs and console so that I can find the error of my ways, repent and
> fix my code. Any suggestions? Anything I should post to support
> troubleshooting? Is this JIRA-worthy? Thanks
>
> Antonio
>
>
>
>


-- 
Best Regards

Jeff Zhang


Re: hiveContext: storing lookup of partitions

2015-12-15 Thread Jeff Zhang
>>> Currently it takes around 1.5 hours for me just to cache in the
partition information and after that I can see that the job gets queued in
the SPARK UI.
I guess you mean the stage of getting the split info. I suspect it might be
your cluster issue (or metadata store), unusually it won't take such long
time for splitting.

On Wed, Dec 16, 2015 at 8:06 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> I have a HIVE table with few thousand partitions (based on date and time).
> It takes a long time to run if for the first time and then subsequently it
> is fast.
>
> Is there a way to store the cache of partition lookups so that every time
> I start a new SPARK instance (cannot keep my personal server running
> continuously), I can immediately restore back the temptable in hiveContext
> without asking it go again and cache the partition lookups?
>
> Currently it takes around 1.5 hours for me just to cache in the partition
> information and after that I can see that the job gets queued in the SPARK
> UI.
>
> Regards,
> Gourav
>



-- 
Best Regards

Jeff Zhang


Re: [SparkR] Is rdd in SparkR deprecated ?

2015-12-14 Thread Jeff Zhang
Thanks Felix, Just curious when I read the code.

On Tue, Dec 15, 2015 at 1:32 AM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> RDD API in SparkR is not officially supported. You could still access them
> with the SparkR::: prefix though.
>
> May I ask what uses you have for them? Would the DataFrame API sufficient?
>
>
>
>
>
> On Mon, Dec 14, 2015 at 4:26 AM -0800, "Jeff Zhang" <zjf...@gmail.com>
> wrote:
>
> From the source code of SparkR, seems SparkR support rdd api. But there's
> no documentation on that. (
> http://spark.apache.org/docs/latest/sparkr.html ) So I guess it is
> deprecated, is that right ?
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


[SparkR] Is rdd in SparkR deprecated ?

2015-12-14 Thread Jeff Zhang
>From the source code of SparkR, seems SparkR support rdd api. But there's
no documentation on that. ( http://spark.apache.org/docs/latest/sparkr.html
) So I guess it is deprecated, is that right ?

-- 
Best Regards

Jeff Zhang


Re: how to make a dataframe of Array[Doubles] ?

2015-12-14 Thread Jeff Zhang
Please use tuple instead of array. ( the element must implement trait
Product if you want to convert RDD to DF)

val testvec = Array( (1.0, 2.0, 3.0, 4.0), (5.0, 6.0, 7.0, 8.0))

On Tue, Dec 15, 2015 at 1:12 PM, AlexG <swift...@gmail.com> wrote:

> My attempts to create a dataframe of Array[Doubles], I get an error about
> RDD[Array[Double]] not having a toDF function:
>
> import sqlContext.implicits._
> val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0))
> val testrdd = sc.parallelize(testvec)
> testrdd.toDF
>
> gives
>
> :29: error: value toDF is not a member of
> org.apache.spark.rdd.RDD[Array[Double]]
>   testrdd.toD
>
> on the other hand, if I make the dataframe more complicated, e.g.
> Tuple2[String, Array[Double]], the transformation goes through:
>
> val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2",
> Array(5.0, 6.0, 7.0, 8.0)) )
> val testrdd = sc.parallelize(testvec)
> testrdd.toDF
>
> gives
> testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] =
> ParallelCollectionRDD[1] at parallelize at :29
> res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array]
>
> What's the cause of this, and how can I get around it to create a dataframe
> of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I
> do want to store all the values in a single column, not individual columns)
>
> I am using Spark 1.5.2
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Database does not exist: (Spark-SQL ===> Hive)

2015-12-14 Thread Jeff Zhang
Do you put hive-site.xml on the classpath ?

On Tue, Dec 15, 2015 at 11:14 AM, Gokula Krishnan D <email2...@gmail.com>
wrote:

> Hello All -
>
>
> I tried to execute a Spark-Scala Program in order to create a table in
> HIVE and faced couple of error so I just tried to execute the "show tables"
> and "show databases"
>
> And I have already created a database named "test_db".But I have
> encountered the error "Database does not exist"
>
> *Note: I do see couple of posts related to this error but nothing was
> helpful for me.*
>
> 
> =
> name := "ExploreSBT_V1"
>
> version := "1.0"
>
> scalaVersion := "2.11.5"
>
> libraryDependencies
> ++=Seq("org.apache.spark"%%"spark-core"%"1.3.0","org.apache.spark"%%"spark-sql"%"1.3.0")
> libraryDependencies += "org.apache.spark"%%"spark-hive"%"1.3.0"
>
> =
> [image: Inline image 1]
>
> Error: Encountered the following exceptions
> :org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution
> Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database
> does not exist: test_db
> 15/12/14 18:49:57 ERROR HiveContext:
> ==
> HIVE FAILURE OUTPUT
> ==
>
>
>
>
>OK
> FAILED: Execution Error, return code 1 from
> org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: test_db
>
> ==
> END HIVE FAILURE OUTPUT
> ==
>
>
> Process finished with exit code 0
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>



-- 
Best Regards

Jeff Zhang


Re: [SparkR] Any reason why saveDF's mode is append by default ?

2015-12-14 Thread Jeff Zhang
Thanks Shivaram, created https://issues.apache.org/jira/browse/SPARK-12318
I will work on it.

On Mon, Dec 14, 2015 at 4:13 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> I think its just a bug -- I think we originally followed the Python
> API (in the original PR [1]) but the Python API seems to have been
> changed to match Scala / Java in
> https://issues.apache.org/jira/browse/SPARK-6366
>
> Feel free to open a JIRA / PR for this.
>
> Thanks
> Shivaram
>
> [1] https://github.com/amplab-extras/SparkR-pkg/pull/199/files
>
> On Sun, Dec 13, 2015 at 11:58 PM, Jeff Zhang <zjf...@gmail.com> wrote:
> > It is inconsistent with scala api which is error by default. Any reason
> for
> > that ? Thanks
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


[SparkR] Any reason why saveDF's mode is append by default ?

2015-12-13 Thread Jeff Zhang
It is inconsistent with scala api which is error by default. Any reason for
that ? Thanks



-- 
Best Regards

Jeff Zhang


Re: Spark assembly in Maven repo?

2015-12-10 Thread Jeff Zhang
I don't think make the assembly jar as dependency a good practice. You may
meet jar hell issue in that case.

On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu <xiaoy...@microsoft.com>
wrote:

> Hi Experts,
>
>
>
> We have a project which has a dependency for the following jar
>
>
>
> spark-assembly--hadoop.jar
>
> for example:
>
> spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar
>
>
>
> since this assembly might be updated in the future, I am not sure if there
> is a Maven repo that has the above spark assembly jar? Or should we create
> & upload it to Maven central?
>
>
>
> Thanks!
>
>
>
> Xiaoyong
>
>
>



-- 
Best Regards

Jeff Zhang


Re: Can't create UDF's in spark 1.5 while running using the hive thrift service

2015-12-08 Thread Jeff Zhang
leAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> When I ran the same against 1.4 it worked.
>>
>> I've also changed the spark.sql.hive.metastore.version version to be 0.13
>> (similar to what it was in 1.4) and 0.14 but I still get the same errors.
>>
>>
>> Any suggestions?
>>
>> Thanks,
>> Trystan
>>
>>
>


-- 
Best Regards

Jeff Zhang


Re: sparkSQL Load multiple tables

2015-12-02 Thread Jeff Zhang
Do you want to load multiple tables by using sql ? JdbcRelation now only
can load single table. It doesn't accept sql as loading command.

On Wed, Dec 2, 2015 at 4:33 PM, censj <ce...@lotuseed.com> wrote:

> hi Fengdong Yu:
> I want to use  sqlContext.read.format('jdbc').options( ... ).load()
> but this function only load a table so  i want to know through some
> operations load multiple tables?
>
>
> 在 2015年12月2日,16:28,Fengdong Yu <fengdo...@everstring.com> 写道:
>
> It cannot read multiple tables,
>
> but if your tables have the same columns, you can read them one by one,
> then unionAll them, such as:
>
> val df1 = sqlContext.table(“table1”)
> val df2 = sqlContext.table(“table2”)
>
> val df = df1.unionAll(df2)
>
>
>
>
>
> On Dec 2, 2015, at 4:06 PM, censj <ce...@lotuseed.com> wrote:
>
> Dear all,
> Can you tell me how did get past SQLContext load function read  multiple
> tables?
>
>
>
>


-- 
Best Regards

Jeff Zhang


Re: how to skip headers when reading multiple files

2015-12-02 Thread Jeff Zhang
Are you read csv file ? If so you can use spark-csv which support skip
header

http://spark-packages.org/package/databricks/spark-csv



On Thu, Dec 3, 2015 at 10:52 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi,
> I am new bee to Spark and Scala .
> As one of my requirement to read and process multiple text files with
> headers using DataFrame API .
> How can I skip headers when processing data with DataFrame API
>
> Thanks in advance .
> Regards,
> Divya
>
>


-- 
Best Regards

Jeff Zhang


Re: SparkSQL API to insert DataFrame into a static partition?

2015-12-01 Thread Jeff Zhang
I don't think there's api for that, but think it is reasonable and helpful
for ETL.

As a workaround you can first register your dataframe as temp table, and
use sql to insert to the static partition.

On Wed, Dec 2, 2015 at 10:50 AM, Isabelle Phan <nlip...@gmail.com> wrote:

> Hello,
>
> Is there any API to insert data into a single partition of a table?
>
> Let's say I have a table with 2 columns (col_a, col_b) and a partition by
> date.
> After doing some computation for a specific date, I have a DataFrame with
> 2 columns (col_a, col_b) which I would like to insert into a specific date
> partition. What is the best way to achieve this?
>
> It seems that if I add a date column to my DataFrame, and turn on dynamic
> partitioning, I can do:
> df.write.partitionBy("date").insertInto("my_table")
> But it seems overkill to use dynamic partitioning function for such a case.
>
>
> Thanks for any pointers!
>
> Isabelle
>
>
>


-- 
Best Regards

Jeff Zhang


No documentation for how to write custom Transformer in ml pipeline ?

2015-11-30 Thread Jeff Zhang
Although writing a custom UnaryTransformer is not difficult, but writing a
non-UnaryTransformer is a little tricky (have to check the source code).
And I don't find any document about how to write custom Transformer in ml
pipeline, but writing custom Transformer is a very basic requirement. Is
this because the interface is still unstable now ?


-- 
Best Regards

Jeff Zhang


Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Jeff Zhang
StringIndexer is an estimator which would train a model to be used both in
training & prediction. So it is consistent between training & prediction.

You may want to read this section of spark ml doc
http://spark.apache.org/docs/latest/ml-guide.html#how-it-works



On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Thanks for the reply Yanbo.
>
> I understand that the model will be trained using the indexer map created
> during the training stage.
>
> But since I am getting a new set of data during prediction, and I have to
> do StringIndexing on the new data also,
> Right now I am using a new StringIndexer for this purpose, or is there any
> way that I can reuse the Indexer used for training stage.
>
> Note: I am having a pipeline with StringIndexer in it, and I am fitting my
> train data in it and building the model. Then later when i get the new data
> for prediction, I am using the same pipeline to fit the data again and do
> the prediction.
>
> Thanks and Regards,
> Vishnu Viswanath
>
>
> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang <yblia...@gmail.com> wrote:
>
>> Hi Vishnu,
>>
>> The string and indexer map is generated at model training step and
>> used at model prediction step.
>> It means that the string and indexer map will not changed when
>> prediction. You will use the original trained model when you do
>> prediction.
>>
>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath <vishnu.viswanat...@gmail.com
>> >:
>> > Hi All,
>> >
>> > I have a general question on using StringIndexer.
>> > StringIndexer gives an index to each label in the feature starting from
>> 0 (
>> > 0 for least frequent word).
>> >
>> > Suppose I am building a model, and I use StringIndexer for transforming
>> on
>> > of my column.
>> > e.g., suppose A was most frequent word followed by B and C.
>> >
>> > So the StringIndexer will generate
>> >
>> > A  0.0
>> > B  1.0
>> > C  2.0
>> >
>> > After building the model, I am going to do some prediction using this
>> model,
>> > So I do the same transformation on my new data which I need to predict.
>> And
>> > suppose the new dataset has C as the most frequent word, followed by B
>> and
>> > A. So the StringIndexer will assign index as
>> >
>> > C 0.0
>> > B 1.0
>> > A 2.0
>> >
>> > These indexes are different from what we used for modeling. So won’t
>> this
>> > give me a wrong prediction if I use StringIndexer?
>> >
>> > --
>> > Thanks and Regards,
>> > Vishnu Viswanath,
>> > www.vishnuviswanath.com
>>
>
>
>
> --
> Thanks and Regards,
> Vishnu Viswanath,
> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>



-- 
Best Regards

Jeff Zhang


Re: Millions of entities in custom Hadoop InputFormat and broadcast variable

2015-11-27 Thread Jeff Zhang
Where do you load all IDs of your dataset ? In your custom
InputFormat#getSplits ?  getSplits will be invoked in driver side to build
the Partition which will be serialized to executor as part of the task.

Do you put all the ids in the InputSplit ? That would make it pretty large.

In your case, I think you can load the ids directly rather than creating
custom Hadoop InputFormat.  e.g.

sc.textFile(id_file, 100).map(load data using the id)

Please make sure use a high partition number ( I use 100 here) in
sc.textFile to get high parallelism.

On Fri, Nov 27, 2015 at 2:06 PM, Anfernee Xu <anfernee...@gmail.com> wrote:

> Hi Spark experts,
>
> First of all, happy Thanksgiving!
>
> The comes to my question, I have implemented custom Hadoop InputFormat to
> load millions of entities from my data source to Spark(as JavaRDD and
> transform to DataFrame). The approach I took in implementing the custom
> Hadoop RDD is loading all ID's of my data entity(each entity has an unique
> ID: Long) and split the ID list(contains 3 millions of Long number for
> example) into configured splits, each split contains a sub-set of ID's, in
> turn my custom RecordReader will load the full entity(a plain Java Bean)
> from my data source for each ID in the specific split.
>
> My first observation is some Spark tasks were timeout, and looks like
> Spark broadcast variable is being used to distribute my splits, is that
> correct? If so, from performance perspective, what enhancement I can make
> to make it better?
>
> Thanks
>
> --
> --Anfernee
>



-- 
Best Regards

Jeff Zhang


Re: Stop Spark yarn-client job

2015-11-26 Thread Jeff Zhang
Could you attach the yarn AM log ?

On Fri, Nov 27, 2015 at 8:10 AM, Jagat Singh <jagatsi...@gmail.com> wrote:

> Hi,
>
> What is the correct way to stop fully the Spark job which is running as
> yarn-client using spark-submit.
>
> We are using sc.stop in the code and can see the job still running (in
> yarn resource manager) after final hive insert is complete.
>
> The code flow is
>
> start context
> do somework
> insert to hive
> sc.stop
>
> This is sparkling water job is that matters.
>
> Is there anything else needed ?
>
> Thanks,
>
> J
>
>
>


-- 
Best Regards

Jeff Zhang


Re: Spark on yarn vs spark standalone

2015-11-26 Thread Jeff Zhang
If your cluster is a dedicated spark cluster (only running spark job, no
other jobs like hive/pig/mr), then spark standalone would be fine.
Otherwise I think yarn would be a better option.

On Fri, Nov 27, 2015 at 3:36 PM, cs user <acldstk...@gmail.com> wrote:

> Hi All,
>
> Apologies if this question has been asked before. I'd like to know if
> there are any downsides to running spark over yarn with the --master
> yarn-cluster option vs having a separate spark standalone cluster to
> execute jobs?
>
> We're looking at installing a hdfs/hadoop cluster with Ambari and
> submitting jobs to the cluster using yarn, or having an Ambari cluster and
> a separate standalone spark cluster, which will run the spark jobs on data
> within hdfs.
>
> With yarn, will we still get all the benefits of spark?
>
> Will it be possible to process streaming data?
>
> Many thanks in advance for any responses.
>
> Cheers!
>



-- 
Best Regards

Jeff Zhang


Re: Optimizing large collect operations

2015-11-26 Thread Jeff Zhang
For such large output, I would suggest you to do the following processing
in cluster rather than in driver (use RDD api to do that).
If you really want to pull it to driver, then you can first save it in hdfs
and then read it using hdfs api to avoid the akka issue

On Fri, Nov 27, 2015 at 2:41 PM, Gylfi <gy...@berkeley.edu> wrote:

> Hi.
>
> I am doing very large collectAsMap() operations, about 10,000,000 records,
> and I am getting
> "org.apache.spark.SparkException: Error communicating with
> MapOutputTracker"
> errors..
>
> details:
> "org.apache.spark.SparkException: Error communicating with MapOutputTracker
> at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
> at
>
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
> at
>
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> at
>
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Error sending message [message
> =
> GetMapOutputStatuses(1)]
> at
> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
> at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
> ... 12 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
>
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
> ... 13 more"
>
> I have already set set the akka.timeout to 300 etc.
> Anyone have any ideas on what the problem could be ?
>
> Regares,
> Gylfi.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimizing-large-collect-operations-tp25498.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Adding new column to Dataframe

2015-11-25 Thread Jeff Zhang
>>> I tried to use df.withColumn but I am getting below exception.

What is rowNumber here ? UDF ?  You can use monotonicallyIncreasingId
for generating id

>>> Also, is it possible to add a column from one dataframe to another?

You can't, because how can you add one dataframe to another if they have
different number of rows. You'd better to use join to correlate 2 data
frames.

On Thu, Nov 26, 2015 at 6:39 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Hi,
>
> I am trying to add the row number to a spark dataframe.
> This is my dataframe:
>
> scala> df.printSchema
> root
> |-- line: string (nullable = true)
>
> I tried to use df.withColumn but I am getting below exception.
>
> scala> df.withColumn("row",rowNumber)
> org.apache.spark.sql.AnalysisException: unresolved operator 'Project 
> [line#2326,'row_number() AS row#2327];
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
> at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
>
> Also, is it possible to add a column from one dataframe to another?
> something like
>
> scala> df.withColumn("line2",df2("line"))
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s) line#2330 
> missing from line#2326 in operator !Project [line#2326,line#2330 AS 
> line2#2331];
>
> ​
>
> Thanks and Regards,
> Vishnu Viswanath
> *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*
>



-- 
Best Regards

Jeff Zhang


Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Jeff Zhang
>>> Do I need to create a new DataFrame for every update to the DataFrame
like
addition of new column or  need to update the original sales DataFrame.

Yes, DataFrame is immutable, and every mutation of DataFrame will produce a
new DataFrame.



On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai <vipulrai8...@gmail.com> wrote:

> Hello Rui,
>
> Sorry , What I meant was the resultant of the original dataframe to which
> a new column was added gives a new DataFrame.
>
> Please check this for more
>
> https://spark.apache.org/docs/1.5.1/api/R/index.html
>
> Check for
> WithColumn
>
>
> Thanks,
> Vipul
>
>
> On 23 November 2015 at 12:42, Sun, Rui <rui@intel.com> wrote:
>
>> Vipul,
>>
>> Not sure if I understand your question. DataFrame is immutable. You can't
>> update a DataFrame.
>>
>> Could you paste some log info for the OOM error?
>>
>> -Original Message-
>> From: vipulrai [mailto:vipulrai8...@gmail.com]
>> Sent: Friday, November 20, 2015 12:11 PM
>> To: user@spark.apache.org
>> Subject: SparkR DataFrame , Out of memory exception for very small file.
>>
>> Hi Users,
>>
>> I have a general doubt regarding DataFrames in SparkR.
>>
>> I am trying to read a file from Hive and it gets created as DataFrame.
>>
>> sqlContext <- sparkRHive.init(sc)
>>
>> #DF
>> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>>  source = "com.databricks.spark.csv", inferSchema='true')
>>
>> registerTempTable(sales,"Sales")
>>
>> Do I need to create a new DataFrame for every update to the DataFrame
>> like addition of new column or  need to update the original sales DataFrame.
>>
>> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")
>>
>>
>> Please help me with this , as the orignal file is only 20MB but it throws
>> out of memory exception on a cluster of 4GB Master and Two workers of 4GB
>> each.
>>
>> Also, what is the logic with DataFrame do I need to register and drop
>> tempTable after every update??
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> Vipul Rai
> www.vipulrai.me
> +91-8892598819
> <http://in.linkedin.com/in/vipulrai/>
>



-- 
Best Regards

Jeff Zhang


Re: SparkR DataFrame , Out of memory exception for very small file.

2015-11-23 Thread Jeff Zhang
If possible, could you share your code ? What kind of operation are you
doing on the dataframe ?

On Mon, Nov 23, 2015 at 5:10 PM, Vipul Rai <vipulrai8...@gmail.com> wrote:

> Hi Zeff,
>
> Thanks for the reply, but could you tell me why is it taking so much time.
> What could be wrong , also when I remove the DataFrame from memory using
> rm().
> It does not clear the memory but the object is deleted.
>
> Also , What about the R functions which are not supported in SparkR.
> Like ddply ??
>
> How to access the nth ROW of SparkR DataFrame.
>
> ​Regards,
> Vipul​
>
> On 23 November 2015 at 14:25, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> >>> Do I need to create a new DataFrame for every update to the
>> DataFrame like
>> addition of new column or  need to update the original sales DataFrame.
>>
>> Yes, DataFrame is immutable, and every mutation of DataFrame will produce
>> a new DataFrame.
>>
>>
>>
>> On Mon, Nov 23, 2015 at 4:44 PM, Vipul Rai <vipulrai8...@gmail.com>
>> wrote:
>>
>>> Hello Rui,
>>>
>>> Sorry , What I meant was the resultant of the original dataframe to
>>> which a new column was added gives a new DataFrame.
>>>
>>> Please check this for more
>>>
>>> https://spark.apache.org/docs/1.5.1/api/R/index.html
>>>
>>> Check for
>>> WithColumn
>>>
>>>
>>> Thanks,
>>> Vipul
>>>
>>>
>>> On 23 November 2015 at 12:42, Sun, Rui <rui@intel.com> wrote:
>>>
>>>> Vipul,
>>>>
>>>> Not sure if I understand your question. DataFrame is immutable. You
>>>> can't update a DataFrame.
>>>>
>>>> Could you paste some log info for the OOM error?
>>>>
>>>> -Original Message-
>>>> From: vipulrai [mailto:vipulrai8...@gmail.com]
>>>> Sent: Friday, November 20, 2015 12:11 PM
>>>> To: user@spark.apache.org
>>>> Subject: SparkR DataFrame , Out of memory exception for very small file.
>>>>
>>>> Hi Users,
>>>>
>>>> I have a general doubt regarding DataFrames in SparkR.
>>>>
>>>> I am trying to read a file from Hive and it gets created as DataFrame.
>>>>
>>>> sqlContext <- sparkRHive.init(sc)
>>>>
>>>> #DF
>>>> sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true',
>>>>  source = "com.databricks.spark.csv",
>>>> inferSchema='true')
>>>>
>>>> registerTempTable(sales,"Sales")
>>>>
>>>> Do I need to create a new DataFrame for every update to the DataFrame
>>>> like addition of new column or  need to update the original sales 
>>>> DataFrame.
>>>>
>>>> sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as
>>>> a")
>>>>
>>>>
>>>> Please help me with this , as the orignal file is only 20MB but it
>>>> throws out of memory exception on a cluster of 4GB Master and Two workers
>>>> of 4GB each.
>>>>
>>>> Also, what is the logic with DataFrame do I need to register and drop
>>>> tempTable after every update??
>>>>
>>>> Thanks,
>>>> Vipul
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>>> additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Vipul Rai
>>> www.vipulrai.me
>>> +91-8892598819
>>> <http://in.linkedin.com/in/vipulrai/>
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
> Regards,
> Vipul Rai
> www.vipulrai.me
> +91-8892598819
> <http://in.linkedin.com/in/vipulrai/>
>



-- 
Best Regards

Jeff Zhang


Re: Re: driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor

2015-11-19 Thread Jeff Zhang
duler.scala:874
> 15/10/16 17:42:41 INFO DAGScheduler: Submitting 1 missing tasks from
> ResultStage 2 (MapPartitionsRDD[5] at map at repro.scala:48)
> 15/10/16 17:42:41 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
> 15/10/16 17:42:41 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID
> 2, localhost, PROCESS_LOCAL, 1444 bytes)
> 15/10/16 17:42:41 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
> 15/10/16 17:42:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID
> 2)
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
> at repro.Repro$$anonfun$main$2.apply(repro.scala:48)
> at repro.Repro$$anonfun$main$2.apply(repro.scala:48)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to
> <http://scala.collection.abstractiterator.to/>(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/16 17:42:41 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
> localhost): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
> at repro.Repro$$anonfun$main$2.apply

Re: has any spark write orc document

2015-11-19 Thread Jeff Zhang
It should be very similar with parquet in the api perspective, Please refer
this doc

http://hortonworks.com/hadoop-tutorial/using-hive-with-orc-from-apache-spark/


On Fri, Nov 20, 2015 at 2:59 PM, zhangjp <592426...@qq.com> wrote:

> Hi,
> has any spark write orc document which like the parquet document.
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
>
> Thanks
>



-- 
Best Regards

Jeff Zhang


Re: Spark build error

2015-11-17 Thread Jeff Zhang
ql.test.ExamplePointUDT in
>> class ExamplePointUDT
>> 
>> ""
>> []
>> List(Nil)
>>  // tree.tpe=org.apache.spark.sql.test.ExamplePointUDT
>> Block( // tree.tpe=Unit
>>   Apply( // def (): org.apache.spark.sql.types.UserDefinedType
>> in class UserDefinedType,
>> tree.tpe=org.apache.spark.sql.types.UserDefinedType
>> ExamplePointUDT.super."" // def ():
>> org.apache.spark.sql.types.UserDefinedType in class UserDefinedType,
>> tree.tpe=()org.apache.spark.sql.types.UserDefinedType
>> Nil
>>   )
>>   ()
>> )
>>   )
>> )
>> == Expanded type of tree ==
>> *ConstantType(*
>> *  value = Constant(org.apache.spark.sql.test.ExamplePoint)*
>> *)*
>> *uncaught exception during compilation: java.lang.AssertionError*
>>
>> *Error:scala: Error: assertion failed: List(object package$DebugNode,
>> object package$DebugNode)*
>> *java.lang.AssertionError: assertion failed: List(object
>> package$DebugNode, object package$DebugNode)*
>> at scala.reflect.internal.Symbols$Symbol.suchThat(Symbols.scala:1678)
>> at
>> scala.reflect.internal.Symbols$ClassSymbol.companionModule0(Symbols.scala:2988)
>> at
>> scala.reflect.internal.Symbols$ClassSymbol.companionModule(Symbols.scala:2991)
>> at
>> scala.tools.nsc.backend.jvm.GenASM$JPlainBuilder.genClass(GenASM.scala:1371)
>> at scala.tools.nsc.backend.jvm.GenASM$AsmPhase.run(GenASM.scala:120)
>> at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1583)
>> at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1557)
>> at scala.tools.nsc.Global$Run.compileSources(Global.scala:1553)
>> at scala.tools.nsc.Global$Run.compile(Global.scala:1662)
>> at xsbt.CachedCompiler0.run(CompilerInterface.scala:126)
>> at xsbt.CachedCompiler0.run(CompilerInterface.scala:102)
>> at xsbt.CompilerInterface.run(CompilerInterface.scala:27)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at sbt.compiler.AnalyzingCompiler.call(AnalyzingCompiler.scala:102)
>> at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:48)
>> at sbt.compiler.AnalyzingCompiler.compile(AnalyzingCompiler.scala:41)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply$mcV$sp(AggressiveCompile.scala:106)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply(AggressiveCompile.scala:106)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3$$anonfun$apply$1.apply(AggressiveCompile.scala:106)
>> at
>> sbt.compiler.AggressiveCompile.sbt$compiler$AggressiveCompile$$timed(AggressiveCompile.scala:179)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3.apply(AggressiveCompile.scala:105)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1$$anonfun$apply$3.apply(AggressiveCompile.scala:102)
>> at scala.Option.foreach(Option.scala:245)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1.apply(AggressiveCompile.scala:102)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6$$anonfun$compileScala$1$1.apply(AggressiveCompile.scala:102)
>> at scala.Option.foreach(Option.scala:245)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6.compileScala$1(AggressiveCompile.scala:102)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6.apply(AggressiveCompile.scala:151)
>> at
>> sbt.compiler.AggressiveCompile$$anonfun$6.apply(AggressiveCompile.scala:89)
>> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:40)
>> at sbt.inc.IncrementalCompile$$anonfun$doCompile$1.apply(Compile.scala:38)
>> at sbt.inc.IncrementalCommon.cycle(Incremental.scala:103)
>> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:39)
>> at sbt.inc.Incremental$$anonfun$1.apply(Incremental.scala:38)
>> at sbt.inc.Incremental$.manageClassfiles(Incremental.scala:69)
>> at sbt.inc.Incremental$.compile(Incremental.scala:38)
>> at sbt.inc.IncrementalCompile$.apply(Compile.scala:28)
>> at sbt.compiler.AggressiveCompile.compile2(AggressiveCompile.scala:170)
>> at sbt.compiler.AggressiveCompile.compile1(AggressiveCompile.scala:73)
>> at
>> org.jetbrains.jps.incremental.scala.local.SbtCompiler.compile(SbtCompiler.scala:66)
>> at
>> org.jetbrains.jps.incremental.scala.local.LocalServer.compile(LocalServer.scala:26)
>> at org.jetbrains.jps.incremental.scala.remote.Main$.make(Main.scala:62)
>> at
>> org.jetbrains.jps.incremental.scala.remote.Main$.nailMain(Main.scala:20)
>> at org.jetbrains.jps.incremental.scala.remote.Main.nailMain(Main.scala)
>> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.martiansoftware.nailgun.NGSession.run(NGSession.java:319)
>>
>> I just highlighted some error message that I think important as *bold
>> and red.*
>>
>> This really bothered me for several days, I don't know how to get
>> through. Any suggestions? Thanks.
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: No spark examples jar in maven repository after 1.1.1 ?

2015-11-16 Thread Jeff Zhang
But it may be useful for user to check the example source code in IDE just
by adding it to maven dependency. Otherwise user have to either download
the source code or check it in github.

On Mon, Nov 16, 2015 at 5:32 PM, Sean Owen <so...@cloudera.com> wrote:

> I think because they're not a library? they're example code, not
> something you build an app on.
>
> On Mon, Nov 16, 2015 at 9:27 AM, Jeff Zhang <zjf...@gmail.com> wrote:
> > I don't find spark examples jar in maven repository after 1.1.1. Any
> reason
> > for that ?
> >
> > http://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.10
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


No spark examples jar in maven repository after 1.1.1 ?

2015-11-16 Thread Jeff Zhang
I don't find spark examples jar in maven repository after 1.1.1. Any reason
for that ?

http://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.10


-- 
Best Regards

Jeff Zhang


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-12 Thread Jeff Zhang
Didn't notice that I can pass comma separated path in the existing API
(SparkContext#textFile). So no necessary for new api. Thanks all.



On Thu, Nov 12, 2015 at 10:24 AM, Jeff Zhang <zjf...@gmail.com> wrote:

> Hi Pradeep
>
> ≥≥≥ Looks like what I was suggesting doesn't work. :/
> I guess you mean put comma separated path into one string and pass it
> to existing API (SparkContext#textFile). It should not work. I suggest to
> create new api SparkContext#textFiles to accept an array of string. I have
> already implemented a simple patch and it works.
>
>
>
>
> On Thu, Nov 12, 2015 at 10:17 AM, Pradeep Gollakota <pradeep...@gmail.com>
> wrote:
>
>> Looks like what I was suggesting doesn't work. :/
>>
>> On Wed, Nov 11, 2015 at 4:49 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Yes, that's what I suggest. TextInputFormat support multiple inputs. So
>>> in spark side, we just need to provide API to for that.
>>>
>>> On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com
>>> > wrote:
>>>
>>>> IIRC, TextInputFormat supports an input path that is a comma separated
>>>> list. I haven't tried this, but I think you should just be able to do
>>>> sc.textFile("file1,file2,...")
>>>>
>>>> On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> I know these workaround, but wouldn't it be more convenient and
>>>>> straightforward to use SparkContext#textFiles ?
>>>>>
>>>>> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com
>>>>> > wrote:
>>>>>
>>>>>> For more than a small number of files, you'd be better off using
>>>>>> SparkContext#union instead of RDD#union.  That will avoid building up a
>>>>>> lengthy lineage.
>>>>>>
>>>>>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Jeff,
>>>>>>> Do you mean reading from multiple text files? In that case, as a
>>>>>>> workaround, you can use the RDD#union() (or ++) method to concatenate
>>>>>>> multiple rdds. For example:
>>>>>>>
>>>>>>> val lines1 = sc.textFile("file1")
>>>>>>> val lines2 = sc.textFile("file2")
>>>>>>>
>>>>>>> val rdd = lines1 union lines2
>>>>>>>
>>>>>>> regards,
>>>>>>> --Jakob
>>>>>>>
>>>>>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Although user can use the hdfs glob syntax to support multiple
>>>>>>>> inputs. But sometimes, it is not convenient to do that. Not sure why
>>>>>>>> there's no api of SparkContext#textFiles. It should be easy to 
>>>>>>>> implement
>>>>>>>> that. I'd love to create a ticket and contribute for that if there's no
>>>>>>>> other consideration that I don't know.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Jeff Zhang
Although user can use the hdfs glob syntax to support multiple inputs. But
sometimes, it is not convenient to do that. Not sure why there's no api
of SparkContext#textFiles. It should be easy to implement that. I'd love to
create a ticket and contribute for that if there's no other consideration
that I don't know.

-- 
Best Regards

Jeff Zhang


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Jeff Zhang
I know these workaround, but wouldn't it be more convenient and
straightforward to use SparkContext#textFiles ?

On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> For more than a small number of files, you'd be better off using
> SparkContext#union instead of RDD#union.  That will avoid building up a
> lengthy lineage.
>
> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com>
> wrote:
>
>> Hey Jeff,
>> Do you mean reading from multiple text files? In that case, as a
>> workaround, you can use the RDD#union() (or ++) method to concatenate
>> multiple rdds. For example:
>>
>> val lines1 = sc.textFile("file1")
>> val lines2 = sc.textFile("file2")
>>
>> val rdd = lines1 union lines2
>>
>> regards,
>> --Jakob
>>
>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> Although user can use the hdfs glob syntax to support multiple inputs.
>>> But sometimes, it is not convenient to do that. Not sure why there's no api
>>> of SparkContext#textFiles. It should be easy to implement that. I'd love to
>>> create a ticket and contribute for that if there's no other consideration
>>> that I don't know.
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>


-- 
Best Regards

Jeff Zhang


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Jeff Zhang
Yes, that's what I suggest. TextInputFormat support multiple inputs. So in
spark side, we just need to provide API to for that.

On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com>
wrote:

> IIRC, TextInputFormat supports an input path that is a comma separated
> list. I haven't tried this, but I think you should just be able to do
> sc.textFile("file1,file2,...")
>
> On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> I know these workaround, but wouldn't it be more convenient and
>> straightforward to use SparkContext#textFiles ?
>>
>> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com>
>> wrote:
>>
>>> For more than a small number of files, you'd be better off using
>>> SparkContext#union instead of RDD#union.  That will avoid building up a
>>> lengthy lineage.
>>>
>>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com>
>>> wrote:
>>>
>>>> Hey Jeff,
>>>> Do you mean reading from multiple text files? In that case, as a
>>>> workaround, you can use the RDD#union() (or ++) method to concatenate
>>>> multiple rdds. For example:
>>>>
>>>> val lines1 = sc.textFile("file1")
>>>> val lines2 = sc.textFile("file2")
>>>>
>>>> val rdd = lines1 union lines2
>>>>
>>>> regards,
>>>> --Jakob
>>>>
>>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> Although user can use the hdfs glob syntax to support multiple inputs.
>>>>> But sometimes, it is not convenient to do that. Not sure why there's no 
>>>>> api
>>>>> of SparkContext#textFiles. It should be easy to implement that. I'd love 
>>>>> to
>>>>> create a ticket and contribute for that if there's no other consideration
>>>>> that I don't know.
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: ResultStage's parent stages only ShuffleMapStages?

2015-11-06 Thread Jeff Zhang
Right, there're only 2 kinds of stage: ResultStage & ShuffleMapStage.
ShuffleMapStage will shuffle its data for downstream consumption, but
ResultStage don't need to do that.

I guess you may be confused these concepts with Map/Reduce.   Actually
ShuffleMapStage could be represented as either Map or Reduce as long as it
produce intermediate data for downstream consumption.




On Fri, Nov 6, 2015 at 4:15 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Just to make sure that what I see in the code and think I understand
> is indeed correct...
>
> When a job is submitted to DAGScheduler, it creates a new ResultStage
> that in turn queries for the parent stages of itself given the RDD
> (using `getParentStagesAndId` in `newResultStage`).
>
> Are a ResultStage's parent stages only ShuffleMapStages?
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


How to use And Operator in filter (PySpark)

2015-10-21 Thread Jeff Zhang
I can do it in scala api, but not sure what's the syntax in pyspark.
(Didn't find it in python api)

Here's what I tried, both failed

>>> df.filter(df.age>3 & df.name=="Andy").collect()
>>> df.filter(df.age>3 and df.name=="Andy").collect()

-- 
Best Regards

Jeff Zhang


Re: Location preferences in pyspark?

2015-10-20 Thread Jeff Zhang
Yes, I don't think there is.  You can use SparkContext.parallelize() to
make a RDD from a list. But no location preferences supported yet.

On Sat, Oct 17, 2015 at 8:42 AM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> I believe what I want is the exact functionality provided by
> SparkContext.makeRDD in Scala. For each element in the RDD, I want specify
> a list of preferred hosts for processing that element.
>
> It looks like this method only exists in Scala, and as far as I can tell
> there is no similar functionality available in python. Is this true?
>
> - Philip
>
>


-- 
Best Regards

Jeff Zhang


Re: Reading JSON in Pyspark throws scala.MatchError

2015-10-20 Thread Jeff Zhang
BTW, I think Json Parser should verify the json format at least when
inferring the schema of json.

On Wed, Oct 21, 2015 at 12:59 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> I think this is due to the json file format.  DataFrame can only accept
> json file with one valid record per line.  Multiple line per record is
> invalid for DataFrame.
>
>
> On Tue, Oct 6, 2015 at 2:48 AM, Davies Liu <dav...@databricks.com> wrote:
>
>> Could you create a JIRA to track this bug?
>>
>> On Fri, Oct 2, 2015 at 1:42 PM, balajikvijayan
>> <balaji.k.vija...@gmail.com> wrote:
>> > Running Windows 8.1, Python 2.7.x, Scala 2.10.5, Spark 1.4.1.
>> >
>> > I'm trying to read in a large quantity of json data in a couple of
>> files and
>> > I receive a scala.MatchError when I do so. Json, Python and stack trace
>> all
>> > shown below.
>> >
>> > Json:
>> >
>> > {
>> > "dataunit": {
>> > "page_view": {
>> > "nonce": 438058072,
>> > "person": {
>> > "user_id": 5846
>> > },
>> > "page": {
>> > "url": "http://mysite.com/blog;
>> > }
>> > }
>> > },
>> > "pedigree": {
>> > "true_as_of_secs": 1438627992
>> > }
>> > }
>> >
>> > Python:
>> >
>> > import pyspark
>> > sc = pyspark.SparkContext()
>> > sqlContext = pyspark.SQLContext(sc)
>> > pageviews = sqlContext.read.json("[Path to folder containing file with
>> above
>> > json]")
>> > pageviews.collect()
>> >
>> > Stack Trace:
>> > Py4JJavaError: An error occurred while calling
>> > z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>> > : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 1
>> > in stage 32.0 failed 1 times, most recent failure: Lost task 1.0 in
>> stage
>> > 32.0 (TID 133, localhost): scala.MatchError:
>> > (VALUE_STRING,ArrayType(StructType(),true)) (of class scala.Tuple2)
>> > at
>> >
>> org.apache.spark.sql.json.JacksonParser$.convertField(JacksonParser.scala:49)
>> > at
>> >
>> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:201)
>> > at
>> >
>> org.apache.spark.sql.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:193)
>> > at
>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> > at
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> > at
>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> > at
>> >
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:116)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> > at
>> >
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:111)
>> > at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> > at
>> >
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> > at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> > at scala.collection.TraversableOnce$class.to
>> (TraversableOnce.scala:273)
>> > at
>> >
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:111)
>> > at
>> >
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> > at
>> >
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:111)
>> > at
>> >
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>> > at
>> >
>> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:111)
>> > at
>> >
>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
>> > at
>> >
>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
>> > at
>> >
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkCo

Re: Reading JSON in Pyspark throws scala.MatchError

2015-10-20 Thread Jeff Zhang
hread.run(Thread.java:745)
> >
> > Driver stacktrace:
> > at
> > org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> > at
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> > at scala.Option.foreach(Option.scala:236)
> > at
> >
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> > at
> >
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> > at
> >
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> > at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> >
> > It seems like this issue has been resolved in scala per  SPARK-3390
> > <https://issues.apache.org/jira/browse/SPARK-3390>  ; any thoughts on
> the
> > root cause of this in pyspark?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Reading-JSON-in-Pyspark-throws-scala-MatchError-tp24911.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: pyspark groupbykey throwing error: unpack requires a string argument of length 4

2015-10-19 Thread Jeff Zhang
Stacktrace would be helpful if you can provide that.



On Mon, Oct 19, 2015 at 1:42 PM, fahad shah <sfaha...@gmail.com> wrote:

>  Hi
>
> I am trying to do pair rdd's, group by the key assign id based on key.
> I am using Pyspark with spark 1.3, for some reason, I am getting this
> error that I am unable to figure out - any help much appreciated.
>
> Things I tried (but to no effect),
>
> 1. make sure I am not doing any conversions on the strings
> 2. make sure that the fields used in the key are all there  and not
> empty string (or else I toss the row out)
>
> My code is along following lines (split is using stringio to parse
> csv, header removes the header row and parse_train is putting the 54
> fields into named tuple after whitespace/quote removal):
>
> #Error for string argument is thrown on the BB.take(1) where the
> groupbykey is evaluated
>
> A = sc.textFile("train.csv").filter(lambda x:not
> isHeader(x)).map(split).map(parse_train).filter(lambda x: not x is
> None)
>
> A.count()
>
> B = A.map(lambda k:
>
> ((k.srch_destination_id,k.srch_length_of_stay,k.srch_booking_window,k.srch_adults_count,
>  k.srch_children_count,k.srch_room_count),
> (k[0:54])))
> BB = B.groupByKey()
> BB.take(1)
>
>
> best fahad
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Setting executors per worker - Standalone

2015-09-28 Thread Jeff Zhang
use "--executor-cores 1" you will get 4 executors per worker since you have
4 cores per worker



On Tue, Sep 29, 2015 at 8:24 AM, James Pirz <james.p...@gmail.com> wrote:

> Hi,
>
> I am using speak 1.5 (standalone mode) on a cluster with 10 nodes while
> each machine has 12GB of RAM and 4 cores. On each machine I have one worker
> which is running one executor that grabs all 4 cores. I am interested to
> check the performance with "one worker but 4 executors per machine - each
> with one core".
>
> I can see that "running multiple executors per worker in Standalone mode"
> is possible based on the closed issue:
>
> https://issues.apache.org/jira/browse/SPARK-1706
>
> But I can not find a way to do that. "SPARK_EXECUTOR_INSTANCES" is only
> available for the Yarn mode, and in the standalone mode I can just set
> "SPARK_WORKER_INSTANCES" and "SPARK_WORKER_CORES" and "SPARK_WORKER_MEMORY".
>
> Any hint or suggestion would be great.
>
>


-- 
Best Regards

Jeff Zhang


Task serialization error for mllib.MovieLensALS

2015-09-09 Thread Jeff Zhang
I run the MovieLensALS, but meet the following error. The weird thing is
that this issue only appear under openjdk. And this is based on the 1.5, I
found several related tickets, not sure has anyone else meet the same issue
and know the solution ? Thanks

https://issues.apache.org/jira/browse/SPARK-4672
https://issues.apache.org/jira/browse/SPARK-4838



Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task serialization failed: java.lang.StackOverflowError
java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
scala.collection.immutable.$colon$colon.writeObject(List.scala:379)

-- 
Best Regards

Jeff Zhang


Re: Event logging not working when worker machine terminated

2015-09-08 Thread Jeff Zhang
What cluster mode do you use ? Standalone/Yarn/Mesos ?


On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch <dar...@darose.net>
wrote:

> Our Spark cluster is configured to write application history event logging
> to a directory on HDFS.  This all works fine.  (I've tested it with Spark
> shell.)
>
> However, on a large, long-running job that we ran tonight, one of our
> machines at the cloud provider had issues and had to be terminated and
> replaced in the middle of the job.
>
> The job completed correctly, and shows in state FINISHED in the "Completed
> Applications" section of the Spark GUI.  However, when I try to look at the
> application's history, the GUI says "Application history not found" and
> "Application ... is still in progress".
>
> The reason appears to be the machine that was terminated.  When I click on
> the executor list for that job, Spark is showing the executor from the
> terminated machine as still in state RUNNING.
>
> Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.
>
> Thanks,
>
> DR
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Submitted applications does not run.

2015-09-01 Thread Jeff Zhang
This is master log. There's no worker registration info in the log. That
means the worker may not start properly. Please check the log file
with apache.spark.deploy.worker in file name.



On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
wrote:

> I cannot see anything abnormal in logs. What would be the reason for not
> availability of executors?
>
> On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk>
> wrote:
>
>> Following are the logs available. Please find the attached.
>>
>> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> It's in SPARK_HOME/logs
>>>
>>> Or you can check the spark web ui. http://[master-machine]:8080
>>>
>>>
>>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>> wrote:
>>>
>>>> How do I check worker logs? SPARK_HOME/work folder does not exist. I am
>>>> using the spark standalone mode.
>>>>
>>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> No executors ? Please check the worker logs if you are using spark
>>>>> standalone mode.
>>>>>
>>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have successfully submitted some jobs to spark master. But the jobs
>>>>>> won't progress and not finishing. Please see the attached screenshot. 
>>>>>> These
>>>>>> are fairly very small jobs and this shouldn't take more than a minute to
>>>>>> finish.
>>>>>>
>>>>>> I'm new to spark and any help would be appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>> Madawa.
>>>>>>
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *_**Madawa Soysa*
>>>>
>>>> Undergraduate,
>>>>
>>>> Department of Computer Science and Engineering,
>>>>
>>>> University of Moratuwa.
>>>>
>>>>
>>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
>>>> madawa...@cse.mrt.ac.lk
>>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
>>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>>
>> --
>>
>> *_**Madawa Soysa*
>>
>> Undergraduate,
>>
>> Department of Computer Science and Engineering,
>>
>> University of Moratuwa.
>>
>>
>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
>> madawa...@cse.mrt.ac.lk
>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>>
>
>
>
> --
>
> *_**Madawa Soysa*
>
> Undergraduate,
>
> Department of Computer Science and Engineering,
>
> University of Moratuwa.
>
>
> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
> madawa...@cse.mrt.ac.lk
> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>



-- 
Best Regards

Jeff Zhang


Re: Submitted applications does not run.

2015-09-01 Thread Jeff Zhang
It's in SPARK_HOME/logs

Or you can check the spark web ui. http://[master-machine]:8080


On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
wrote:

> How do I check worker logs? SPARK_HOME/work folder does not exist. I am
> using the spark standalone mode.
>
> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> No executors ? Please check the worker logs if you are using spark
>> standalone mode.
>>
>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>> wrote:
>>
>>> Hi All,
>>>
>>> I have successfully submitted some jobs to spark master. But the jobs
>>> won't progress and not finishing. Please see the attached screenshot. These
>>> are fairly very small jobs and this shouldn't take more than a minute to
>>> finish.
>>>
>>> I'm new to spark and any help would be appreciated.
>>>
>>> Thanks,
>>> Madawa.
>>>
>>>
>>> -----
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
>
> *_**Madawa Soysa*
>
> Undergraduate,
>
> Department of Computer Science and Engineering,
>
> University of Moratuwa.
>
>
> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
> madawa...@cse.mrt.ac.lk
> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>



-- 
Best Regards

Jeff Zhang


Re: Submitted applications does not run.

2015-09-01 Thread Jeff Zhang
No executors ? Please check the worker logs if you are using spark
standalone mode.

On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
wrote:

> Hi All,
>
> I have successfully submitted some jobs to spark master. But the jobs
> won't progress and not finishing. Please see the attached screenshot. These
> are fairly very small jobs and this shouldn't take more than a minute to
> finish.
>
> I'm new to spark and any help would be appreciated.
>
> Thanks,
> Madawa.
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Best Regards

Jeff Zhang


Re: cached data between jobs

2015-09-01 Thread Jeff Zhang
Hi Eric,

If the 2 jobs share the same parent stages. these stages can be skipped for
the second job.

Here's one simple example:

val rdd1 = sc.parallelize(1 to 10).map(e=>(e,e))
val rdd2 = rdd1.groupByKey()
rdd2.map(e=>e._1).collect() foreach println
rdd2.map(e=> (e._1, e._2.size)).collect foreach println

Obviously, there are 2 jobs and both of them have 2 stages. Luckily here
these 2 jobs share the same stage (the first stage of each job), although
you doesn't cache these data explicitly, once one stage is completed, it is
marked as available and can used for other jobs. so for the second job, it
only needs to run one stage.
You should be able to see the skipped stage in the spark job ui.



[image: Inline image 1]

On Wed, Sep 2, 2015 at 12:53 AM, Eric Walker <eric.wal...@gmail.com> wrote:

> Hi,
>
> I'm noticing that a 30 minute job that was initially IO-bound may not be
> during subsequent runs.  Is there some kind of between-job caching that
> happens in Spark or in Linux that outlives jobs and that might be making
> subsequent runs faster?  If so, is there a way to avoid the caching in
> order to get a better sense of the worst-case scenario?
>
> (It's also possible that I've simply changed something that made things
> faster.)
>
> Eric
>
>


-- 
Best Regards

Jeff Zhang


Re: Submitted applications does not run.

2015-09-01 Thread Jeff Zhang
Did you start spark cluster using command sbin/start-all.sh ?
You should have 2 log files under folder if it is single-node cluster. Like
the following

spark-jzhang-org.apache.spark.deploy.master.Master-1-jzhangMBPr.local.out
spark-jzhang-org.apache.spark.deploy.worker.Worker-1-jzhangMBPr.local.out



On Tue, Sep 1, 2015 at 4:01 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
wrote:

> There are no logs which includes apache.spark.deploy.worker in file name
> in the SPARK_HOME/logs folder.
>
> On 1 September 2015 at 13:00, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> This is master log. There's no worker registration info in the log. That
>> means the worker may not start properly. Please check the log file
>> with apache.spark.deploy.worker in file name.
>>
>>
>>
>> On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>> wrote:
>>
>>> I cannot see anything abnormal in logs. What would be the reason for not
>>> availability of executors?
>>>
>>> On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>> wrote:
>>>
>>>> Following are the logs available. Please find the attached.
>>>>
>>>> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> It's in SPARK_HOME/logs
>>>>>
>>>>> Or you can check the spark web ui. http://[master-machine]:8080
>>>>>
>>>>>
>>>>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>>>> wrote:
>>>>>
>>>>>> How do I check worker logs? SPARK_HOME/work folder does not exist. I
>>>>>> am using the spark standalone mode.
>>>>>>
>>>>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>
>>>>>>> No executors ? Please check the worker logs if you are using spark
>>>>>>> standalone mode.
>>>>>>>
>>>>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <
>>>>>>> madawa...@cse.mrt.ac.lk> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have successfully submitted some jobs to spark master. But the
>>>>>>>> jobs won't progress and not finishing. Please see the attached 
>>>>>>>> screenshot.
>>>>>>>> These are fairly very small jobs and this shouldn't take more than a 
>>>>>>>> minute
>>>>>>>> to finish.
>>>>>>>>
>>>>>>>> I'm new to spark and any help would be appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Madawa.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -
>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *_**Madawa Soysa*
>>>>>>
>>>>>> Undergraduate,
>>>>>>
>>>>>> Department of Computer Science and Engineering,
>>>>>>
>>>>>> University of Moratuwa.
>>>>>>
>>>>>>
>>>>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
>>>>>> madawa...@cse.mrt.ac.lk
>>>>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
>>>>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *_**Madawa Soysa*
>>>>
>>>> Undergraduate,
>>>>
>>>> Department of Computer Science and Engineering,
>>>>
>>>> University of Moratuwa.
>>>>
>>>>
>>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
>>>> madawa...@cse.mrt.ac.lk
>>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
>>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> *_**Madawa Soysa*
>>>
>>> Undergraduate,
>>>
>>> Department of Computer Science and Engineering,
>>>
>>> University of Moratuwa.
>>>
>>>
>>> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
>>> madawa...@cse.mrt.ac.lk
>>> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
>>> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>
> --
>
> *_**Madawa Soysa*
>
> Undergraduate,
>
> Department of Computer Science and Engineering,
>
> University of Moratuwa.
>
>
> Mobile: +94 71 461 6050 <%2B94%2075%20812%200726> | Email:
> madawa...@cse.mrt.ac.lk
> LinkedIn <http://lk.linkedin.com/in/madawasoysa> | Twitter
> <https://twitter.com/madawa_rc> | Tumblr <http://madawas.tumblr.com/>
>



-- 
Best Regards

Jeff Zhang


Re: Submitted applications does not run.

2015-09-01 Thread Jeff Zhang
You need to make yourself able to ssh to localhost without password, please
check this blog.

http://hortonworks.com/kb/generating-ssh-keys-for-passwordless-login/



On Tue, Sep 1, 2015 at 4:31 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
wrote:

> I used ./sbin/start-master.sh
>
> When I used ./sbin/start-all.sh the start fails. I get the following error.
>
> failed to launch org.apache.spark.deploy.master.Master:
> localhost: ssh: connect to host localhost port 22: Connection refused
>
> On 1 September 2015 at 13:41, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Did you start spark cluster using command sbin/start-all.sh ?
>> You should have 2 log files under folder if it is single-node cluster.
>> Like the following
>>
>> spark-jzhang-org.apache.spark.deploy.master.Master-1-jzhangMBPr.local.out
>> spark-jzhang-org.apache.spark.deploy.worker.Worker-1-jzhangMBPr.local.out
>>
>>
>>
>> On Tue, Sep 1, 2015 at 4:01 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>> wrote:
>>
>>> There are no logs which includes apache.spark.deploy.worker in file
>>> name in the SPARK_HOME/logs folder.
>>>
>>> On 1 September 2015 at 13:00, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> This is master log. There's no worker registration info in the log.
>>>> That means the worker may not start properly. Please check the log file
>>>> with apache.spark.deploy.worker in file name.
>>>>
>>>>
>>>>
>>>> On Tue, Sep 1, 2015 at 2:55 PM, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>>> wrote:
>>>>
>>>>> I cannot see anything abnormal in logs. What would be the reason for
>>>>> not availability of executors?
>>>>>
>>>>> On 1 September 2015 at 12:24, Madawa Soysa <madawa...@cse.mrt.ac.lk>
>>>>> wrote:
>>>>>
>>>>>> Following are the logs available. Please find the attached.
>>>>>>
>>>>>> On 1 September 2015 at 12:18, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>
>>>>>>> It's in SPARK_HOME/logs
>>>>>>>
>>>>>>> Or you can check the spark web ui. http://[master-machine]:8080
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 1, 2015 at 2:44 PM, Madawa Soysa <
>>>>>>> madawa...@cse.mrt.ac.lk> wrote:
>>>>>>>
>>>>>>>> How do I check worker logs? SPARK_HOME/work folder does not exist.
>>>>>>>> I am using the spark standalone mode.
>>>>>>>>
>>>>>>>> On 1 September 2015 at 12:05, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No executors ? Please check the worker logs if you are using spark
>>>>>>>>> standalone mode.
>>>>>>>>>
>>>>>>>>> On Tue, Sep 1, 2015 at 2:17 PM, Madawa Soysa <
>>>>>>>>> madawa...@cse.mrt.ac.lk> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have successfully submitted some jobs to spark master. But the
>>>>>>>>>> jobs won't progress and not finishing. Please see the attached 
>>>>>>>>>> screenshot.
>>>>>>>>>> These are fairly very small jobs and this shouldn't take more than a 
>>>>>>>>>> minute
>>>>>>>>>> to finish.
>>>>>>>>>>
>>>>>>>>>> I'm new to spark and any help would be appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Madawa.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>&

Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found

2015-08-25 Thread Jeff Zhang
As I remember, you also need to change guava and jetty related dependency
to compile if you run to run SparkPi in intellij.



On Tue, Aug 25, 2015 at 3:15 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Go to the module settings of the project and in the dependencies section
 check the scope of scala jars. It would be either Test or Provided. Change
 it to compile and it should work. Check the following link to understand
 more about scope of modules:


 https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html



 On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote:

 I cloned the code from https://github.com/apache/spark to my machine. It
 can compile successfully,
 But when I run the sparkpi, it throws an exception below complaining the
 scala.collection.Seq is not found.
 I have installed scala2.10.4 in my machine, and use the default profiles:
 window,scala2.10,maven-3,test-java-home.
 In Idea, I can find that the Seq class is on my classpath:





 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/Seq
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 6 more





-- 
Best Regards

Jeff Zhang


DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Jeff Zhang
It's weird to me that the simple show function will cost 2 spark jobs.
DataFrame#explain shows it is a very simple operation, not sure why need 2
jobs.

== Parsed Logical Plan ==
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Analyzed Logical Plan ==
age: bigint, name: string
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Optimized Logical Plan ==
Relation[age#0L,name#1]
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]

== Physical Plan ==
Scan
JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]



-- 
Best Regards

Jeff Zhang


Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Jeff Zhang
Hi Cheng,

I know that sqlContext.read will trigger one spark job to infer the schema.
What I mean is DataFrame#show cost 2 spark jobs. So overall it would cost 3
jobs.

Here's the command I use:

 val df =
sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
   // trigger one spark job to infer schema
 df.show()// trigger 2 spark jobs which is weird




On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Jeff Zhang
Hi Satish,

I don't see where spark support -i, so suspect it is provided by DSE. In
that case, it might be bug of DSE.



On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








-- 
Best Regards

Jeff Zhang


Re: SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq

2015-08-16 Thread Jeff Zhang
Check module example's dependency (right click examples and click Open
Modules Settings), by default scala-library is provided, you need to change
it to compile to run SparkPi in Intellij. As I remember, you also need to
change guava and jetty related library to compile too.

On Mon, Aug 17, 2015 at 2:14 AM, xiaohe lan zombiexco...@gmail.com wrote:

 Hi,

 I am trying to run SparkPi in Intellij and getting NoClassDefFoundError.
 Anyone else saw this issue before ?

 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/Seq
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 6 more

 Process finished with exit code 1

 Thanks,
 Xiaohe




-- 
Best Regards

Jeff Zhang


Re: Spark Master HA on YARN

2015-08-16 Thread Jeff Zhang
To make it clear,  Spark Standalone is similar to Yarn as a simple cluster
management system.

Spark Master  ---   Yarn Resource Manager
Spark Worker  ---   Yarn Node Manager

On Mon, Aug 17, 2015 at 4:59 AM, Ruslan Dautkhanov dautkha...@gmail.com
wrote:

 There is no Spark master in YARN mode. It's standalone mode terminology.
 In YARN cluster mode, Spark's Application Master (Spark Driver runs in it)
 will be restarted
 automatically by RM up to yarn.resourcemanager.am.max-retries
 times (default is 2).

 --
 Ruslan Dautkhanov

 On Fri, Jul 17, 2015 at 1:29 AM, Bhaskar Dutta bhas...@gmail.com wrote:

 Hi,

 Is Spark master high availability supported on YARN (yarn-client mode)
 analogous to
 https://spark.apache.org/docs/1.4.0/spark-standalone.html#high-availability
 ?

 Thanks
 Bhaskie





-- 
Best Regards

Jeff Zhang


Re: Always two tasks slower than others, and then job fails

2015-08-14 Thread Jeff Zhang
Data skew ? May your partition key has some special value like null or
empty string

On Fri, Aug 14, 2015 at 11:01 AM, randylu randyl...@gmail.com wrote:

   It is strange that there are always two tasks slower than others, and the
 corresponding partitions's data are larger, no matter how many partitions?


 Executor ID Address Task Time   Shuffle Read Size /
 Records
 1   slave129.vsvs.com:56691 16 s1   99.5 MB / 18865432
 *10 slave317.vsvs.com:59281 0 ms0   413.5 MB / 311001318*
 100 slave290.vsvs.com:60241 19 s1   110.8 MB / 27075926
 101 slave323.vsvs.com:36246 14 s1   126.1 MB / 25052808

   Task time and records of Executor 10 seems strange, and the cpus on the
 node are all 100% busy.

   Anyone meets the same problem,  Thanks in advance for any answer!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


Re: Job is Failing automatically

2015-08-11 Thread Jeff Zhang
 15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
3.0 (TID 71, sdldalplhdw02.suddenlink.cequel3.com):
java.lang.NullPointerException

at 
com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313)


It's your app error. NPE from HBaseStoreHelper



On Wed, Aug 12, 2015 at 5:12 AM, Nikhil Gs gsnikhil1432...@gmail.com
wrote:

 Hello Team,

 I am facing an error which I have pasted below. My job is failing when I
 am copying my data files into flume spool directory. Most of the time the
 job is getting failed. Dont know why..

 Facing this issue several times. Also, for your reference I have attached
 the complete Yarn log file. Please suggest me whats the issue.

 Thanks in advance.

 15/08/11 12:59:30 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in 
 memory on sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 2.1 KB, free: 
 1059.7 MB)
 15/08/11 12:59:31 INFO storage.BlockManagerInfo: Added rdd_5_0 in memory on 
 sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1693.6 KB, free: 1058.0 MB)
 15/08/11 12:59:32 INFO storage.BlockManagerInfo: Added rdd_7_0 in memory on 
 sdldalplhdw02.suddenlink.cequel3.com:35668 (size: 1697.6 KB, free: 1056.4 MB)
 15/08/11 12:59:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 
 (TID 71, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException
   at 
 com.suddenlink.pnm.process.HBaseStoreHelper.flush(HBaseStoreHelper.java:313)
   at 
 com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:57)
   at 
 com.suddenlink.pnm.process.StoreNodeInHBase$1.call(StoreNodeInHBase.java:31)
   at 
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304)
   at 
 org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:304)
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   at 
 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
   at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
   at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
   at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
   at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
   at org.apache.spark.scheduler.Task.run(Task.scala:64)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)

 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 
 3.0 (TID 72, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes)
 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 3.0 
 (TID 72) on executor sdldalplhdw02.suddenlink.cequel3.com: 
 java.lang.NullPointerException (null) [duplicate 1]
 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 
 3.0 (TID 73, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes)
 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 3.0 
 (TID 73) on executor sdldalplhdw02.suddenlink.cequel3.com: 
 java.lang.NullPointerException (null) [duplicate 2]
 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 
 3.0 (TID 74, sdldalplhdw02.suddenlink.cequel3.com, NODE_LOCAL, 1179 bytes)
 15/08/11 12:59:34 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 3.0 
 (TID 74) on executor sdldalplhdw02.suddenlink.cequel3.com: 
 java.lang.NullPointerException (null) [duplicate 3]
 15/08/11 12:59:34 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 failed 
 4 times; aborting job
 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Removed TaskSet 3.0, 
 whose tasks have all completed, from pool
 15/08/11 12:59:34 INFO cluster.YarnClusterScheduler: Cancelling stage 3
 15/08/11 12:59:34 INFO scheduler.DAGScheduler: Job 2 failed: foreachRDD at 
 NodeProcessor.java:101, took 4.750491 s
 15/08/11 12:59:34 ERROR scheduler.JobScheduler: Error running job streaming 
 job 143931597 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
 stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 
 (TID 74, sdldalplhdw02.suddenlink.cequel3.com): java.lang.NullPointerException

 Regards,
 Nik.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


Re: Spark Job Hangs on our production cluster

2015-08-11 Thread Jeff Zhang
(SelectorImpl.java:87)
 - locked 0x00067bf47710 (a
 io.netty.channel.nio.SelectedSelectionKeySet)
 - locked 0x00067bf374e8 (a
 java.util.Collections$UnmodifiableSet)
 - locked 0x00067bf373d0 (a sun.nio.ch.EPollSelectorImpl)
 at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
 at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:745)

 Meantime, I can confirm our Hadoop/HDFS cluster works fine, as the
 MapReduce jobs also run without any problem, and Hadoop fs command works
 fine in the BigInsight.

 I attached the jstack output with this email, but I don't know what could
 be the root reason.
 The same Spark shell command works fine, if I point to the small dataset,
 instead of big dataset. The small dataset will have around 800 HDFS blocks,
 and Spark finishes without any problem.

 Here are some facts I know:

 1) Since the BigInsight is running on IBM JDK, so I make the Spark run
 under the same JDK, same problem for BigData set.
 2) I even changed --total-executor-cores to 42, which will make each
 executor runs with one core (as we have 42 Spark workers), to avoid any
 multithreads, but still no luck.
 3) This problem of scanning 1T data hanging is NOT 100% for sure
 happening. Sometime I didn't see it, but more than 50% I will see it if I
 try.
 4) We never met this issue on our stage cluster, but it has only (1
 namenode + 1 jobtracker + 3 data/task nodes), and the same dataset is only
 160G on it.
 5) While the Spark java processing hanging, I didn't see any exception or
 issue on the HDFS data node log.

 Does anyone have any clue about this?

 Thanks

 Yong




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Best Regards

Jeff Zhang


Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-08-04 Thread Jeff Zhang
Please check the node manager logs to see why the container is killed.

On Mon, Aug 3, 2015 at 11:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi all any help will be much appreciated my spark job runs fine but in the
 middle it starts loosing executors because of netafetchfailed exception
 saying shuffle not found at the location since executor is lost
 On Jul 31, 2015 11:41 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi thanks for the response. It looks like YARN container is getting
 killed but dont know why I see shuffle metafetchexception as mentioned in
 the following SO link. I have enough memory 8 nodes 8 cores 30 gig memory
 each. And because of this metafetchexpcetion YARN killing container running
 executor how can it over run memory I tried to give each executor 25 gig
 still it is not sufficient and it fails. Please guide I dont understand
 what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as
 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties
 like Kyro serializer I have kept 500 akka frame size 20 akka threads dont
 know I am trapped its been two days I am trying to recover from this issue.


 http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept



 On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan 
 ashwin.fo...@gmail.com wrote:

 What is your cluster configuration ( size and resources) ?

 If you do not have enough resources, then your executor will not run.
 Moreover allocating 8 cores to an executor is too much.

 If you have a cluster with four nodes running NodeManagers, each
 equipped with 4 cores and 8GB of memory,
 then an optimal configuration would be,

 --num-executors 8 --executor-cores 2 --executor-memory 2G

 Thanks,
 Ashwin

 On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have one Spark job which runs fine locally with less data but when
 I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks
 in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Thanks  Regards,
 Ashwin Giridharan





-- 
Best Regards

Jeff Zhang


Re: Spark on YARN

2015-07-30 Thread Jeff Zhang
:59596
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 'SparkUI' on 
 port 59596.
 15/07/30 12:13:34 INFO ui.SparkUI: Started SparkUI at http://10.21.1.77:59596
 15/07/30 12:13:34 INFO cluster.YarnClusterScheduler: Created 
 YarnClusterScheduler
 15/07/30 12:13:34 INFO util.Utils: Successfully started service 
 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43354.
 15/07/30 12:13:34 INFO netty.NettyBlockTransferService: Server created on 
 43354
 15/07/30 12:13:34 INFO storage.BlockManagerMaster: Trying to register 
 BlockManager
 15/07/30 12:13:34 INFO storage.BlockManagerMasterEndpoint: Registering block 
 manager 10.21.1.77:43354 with 246.0 MB RAM, BlockManagerId(driver, 
 10.21.1.77, 43354)
 15/07/30 12:13:34 INFO storage.BlockManagerMaster: Registered BlockManager
 15/07/30 12:13:34 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
 ApplicationMaster registered as 
 AkkaRpcEndpointRef(Actor[akka://sparkDriver/user/YarnAM#-603094240])
 15/07/30 12:13:34 INFO client.RMProxy: Connecting to ResourceManager at 
 hadoop-1/10.21.1.77:8030
 15/07/30 12:13:34 INFO yarn.YarnRMClient: Registering the ApplicationMaster
 15/07/30 12:13:34 INFO yarn.YarnAllocator: Will request 2 executor 
 containers, each with 1 cores and 1408 MB memory including 384 MB overhead
 15/07/30 12:13:34 INFO yarn.YarnAllocator: Container request (host: Any, 
 capability: memory:1408, vCores:1)
 15/07/30 12:13:34 INFO yarn.YarnAllocator: Container request (host: Any, 
 capability: memory:1408, vCores:1)
 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Started progress reporter 
 thread - sleep time : 5000
 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, 
 exitCode: 0, (reason: Shutdown hook called before final status was reported.)
 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Unregistering 
 ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before 
 final status was reported.)
 15/07/30 12:13:35 INFO impl.AMRMClientImpl: Waiting for application to be 
 successfully unregistered.
 15/07/30 12:13:35 INFO yarn.ApplicationMaster: Deleting staging directory 
 .sparkStaging/application_1438090734187_0010
 15/07/30 12:13:35 INFO storage.DiskBlockManager: Shutdown hook called
 15/07/30 12:13:35 INFO util.Utils: Shutdown hook called
 15/07/30 12:13:35 INFO util.Utils: Deleting directory 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/userFiles-337c9be5-569f-43ff-ba1f-ec24daab9ea5
 15/07/30 12:13:35 INFO util.Utils: Deleting directory 
 /home/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1438090734187_0010/httpd-d1232310-5aa1-44e7-a99a-cc2ae614f89c






-- 
Best Regards

Jeff Zhang


Re: help plz! how to use zipWithIndex to each subset of a RDD

2015-07-29 Thread Jeff Zhang
This may be what you want

val conf = new SparkConf().setMaster(local).setAppName(test)
val sc = new SparkContext(conf)

val inputRdd = sc.parallelize(Array((key_1, a), (key_1,b),
(key_2,c), (key_2, d)))

val result = inputRdd.groupByKey().flatMap(e={
  val key= e._1
  val valuesWithIndex = e._2.zipWithIndex
  valuesWithIndex.map(value = (key, value._2, value._1))
})

result.collect() foreach println


/// output

*(key_2,0,c)
(key_2,1,d)
(key_1,0,a)
(key_1,1,b)*


On Thu, Jul 30, 2015 at 10:19 AM, ayan guha guha.a...@gmail.com wrote:

 Is there a relationship between data and index? I.e with a,b,c to 1,2,3?
 On 30 Jul 2015 12:13, askformore askf0rm...@163.com wrote:

 I have some data like this: RDD[(String, String)] = ((*key-1*, a), (
 *key-1*,b), (*key-2*,a), (*key-2*,c),(*key-3*,b),(*key-4*,d)) and I want
 to group the data by Key, and for each group, add index fields to the
 groupmember, at last I can transform the data to below : RDD[(String,
 *Int*, String)] = ((key-1,*1*, a), (key-1,*2,*b), (key-2,*1*,a), (key-2,
 *2*,b),(key-3,*1*,b),(key-4,*1*,d)) I tried to groupByKey firstly, then
 I got a RDD[(String, Iterable[String])], but I don't know how to use
 zipWithIndex function to each Iterable... thanks.
 --
 View this message in context: help plz! how to use zipWithIndex to each
 subset of a RDD
 http://apache-spark-user-list.1001560.n3.nabble.com/help-plz-how-to-use-zipWithIndex-to-each-subset-of-a-RDD-tp24071.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




-- 
Best Regards

Jeff Zhang


Re: Console log file of CoarseGrainedExecutorBackend

2015-07-16 Thread Jeff Zhang
By default it is in ${SPARK_HOME}/work/${APP_ID}/${EXECUTOR_ID}

On Thu, Jul 16, 2015 at 3:43 PM, Tao Lu taolu2...@gmail.com wrote:

 Hi, Guys,

 Where can I find the console log file of CoarseGrainedExecutorBackend
 process?

 Thanks!

 Tao




-- 
Best Regards

Jeff Zhang


Re: The auxService:spark_shuffle does not exist

2015-07-07 Thread Jeff Zhang
Did you enable the dynamic resource allocation ? You can refer to this page
for how to configure spark shuffle service for yarn.

https://spark.apache.org/docs/1.4.0/job-scheduling.html


On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote:

 we tried --master yarn-client with no different result.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


Re: Why does driver transfer application jar to executors?

2015-06-17 Thread Jeff Zhang
TaskDescription only serialize the jar path not the jar content. Multiple
tasks can run on the same executor. Executor will check whether the jar has
been fetched when each time task is launched. If so, it won't fetch it
again.
Only serialize the jar path can prevent serialize jar multiple times which
is inefficient.

On Thu, Jun 18, 2015 at 10:48 AM, Shiyao Ma i...@introo.me wrote:

 Hi,

 Looking from my executor logs, the submitted application jar is
 transmitted to each executors?

 Why does spark do the above? To my understanding, the tasks to be run
 are already serialized with TaskDescription.


 Regards.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


Re: Filter operation to return two RDDs at once.

2015-06-03 Thread Jeff Zhang
As far as I know, spark don't support multiple outputs

On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote:

 Why do you need to do that if filter and content of the resulting rdd are
 exactly same? You may as well declare them as 1 RDD.
 On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId
 != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.
 qualifiedTreatmentId == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




-- 
Best Regards

Jeff Zhang


Re: Filter operation to return two RDDs at once.

2015-06-03 Thread Jeff Zhang
I check the RDD#randSplit, it is much more like multiple one-to-one
transformation rather than a one-to-multiple transformation.

I write one sample code as following, it would generate 3 stages. Although
we can use cache here to make it better, If spark can support multiple
outputs, only 2 stages are needed. ( This would be useful for pig's
multiple query and hive's self join )


val data = 
sc.textFile(/Users/jzhang/a.log).flatMap(line=line.split(\\s)).map(w=(w,1))
val parts = data.randomSplit(Array(0.2,0.8))
val joinResult = parts(0).join(parts(1))
println(joinResult.toDebugString)


(1) MapPartitionsRDD[8] at join at WordCount.scala:22 []
 |  MapPartitionsRDD[7] at join at WordCount.scala:22 []
 |  CoGroupedRDD[6] at join at WordCount.scala:22 []
 +-(1) PartitionwiseSampledRDD[4] at randomSplit at WordCount.scala:21 []
 |  |  MapPartitionsRDD[3] at map at WordCount.scala:20 []
 |  |  MapPartitionsRDD[2] at flatMap at WordCount.scala:20 []
 |  |  /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at
WordCount.scala:20 []
 |  |  /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 []
 +-(1) PartitionwiseSampledRDD[5] at randomSplit at WordCount.scala:21 []
|  MapPartitionsRDD[3] at map at WordCount.scala:20 []
|  MapPartitionsRDD[2] at flatMap at WordCount.scala:20 []
|  /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at
WordCount.scala:20 []
|  /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 []


On Wed, Jun 3, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote:

 In the sense here, Spark actually does have operations that make multiple
 RDDs like randomSplit. However there is not an equivalent of the partition
 operation which gives the elements that matched and did not match at once.

 On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote:

 As far as I know, spark don't support multiple outputs

 On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote:

 Why do you need to do that if filter and content of the resulting rdd
 are exactly same? You may as well declare them as 1 RDD.
 On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.
 qualifiedTreatmentId != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.
 qualifiedTreatmentId == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




 --
 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread Jeff Zhang
node down or container preempted ? You need to check the executor log /
node manager log for more info.

On Wed, Jun 3, 2015 at 2:31 PM, patcharee patcharee.thong...@uni.no wrote:

 Hi,

 What can be the cause of this ERROR cluster.YarnScheduler: Lost executor?
 How can I fix it?

 Best,
 Patcharee

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


No overwrite flag for saveAsXXFile

2015-03-06 Thread Jeff Zhang
Hi folks,

I found that RDD:saveXXFile has no overwrite flag which I think is very
helpful. Is there any reason for this ?



-- 
Best Regards

Jeff Zhang


Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Jeff Zhang
Hi Sean,

  If you know a stage needs unusually high parallelism for example you can
repartition further for that stage.

The problem is we may don't know whether high parallelism is needed. e.g.
for the join operator, high parallelism may only be necessary for some
dataset that lots of data can join together while for other dataset high
parallelism may not be necessary if only a few data can join together.

So my question is that unable changing parallelism at runtime dynamically
may not be flexible.



On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote:

 Hm, what do you mean? You can control, to some extent, the number of
 partitions when you read the data, and can repartition if needed.

 You can set the default parallelism too so that it takes effect for most
 ops thay create an RDD. One # of partitions is usually about right for all
 work (2x or so the number of execution slots).

 If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.
  On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime. Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: Is the RDD's Partitions determined before hand ?

2015-03-03 Thread Jeff Zhang
Thanks Sean.

But if the partitions of RDD is determined before hand, it would not be
flexible to run the same program on the different dataset. Although for the
first stage the partitions can be determined by the input data set, for the
intermediate stage it is not possible. Users have to create policy to
repartition or coalesce based on the data set size.


On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime. Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




-- 
Best Regards

Jeff Zhang


<    1   2