Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Aaron Davidson
ConnectionManager has been deprecated and is no longer used by default
(NettyBlockTransferService is the replacement). Hopefully you would no
longer see these messages unless you have explicitly flipped it back on.

On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote:

 And also https://issues.apache.org/jira/browse/SPARK-3106
 This one is still open.

 On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this jira
 is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)



Re: Total delay per batch in a CSV file

2015-08-05 Thread Saisai Shao
Hi,

Lots of streaming internal status are exposed through StreamingListener, as
well as what see from web UI, so you could write your own StreamingListener
and register in StreamingContext to get the internal information of Spark
Streaming and write to CSV file.

You could check the source code here (
https://github.com/apache/spark/blob/3c0156899dc1ec1f7dfe6d7c8af47fa6dc7d00bf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
).

Thanks
Saisai


On Tue, Aug 4, 2015 at 6:58 PM, allonsy luke1...@gmail.com wrote:

 Hi everyone,

 I'm working with Spark Streaming, and I need to perform some offline
 performance measures.

 What I'd like to have is a CSV file that reports something like this:

 *Batch number/timestampInput SizeTotal Delay*

 which is in fact similar to what the UI outputs.

 I tried to get some metrics (metrics.properties), but I'm having hard time
 getting precise information on every single batch, since they only have
 entries concerning the /last/ (completed/received) batch, and values are
 often different to those appearing in the UI.

 Can anybody give me some advice on how to get metrics that are close to
 those of the UI?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Total-delay-per-batch-in-a-CSV-file-tp24129.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
using coalesce might be dangerous, since 1 worker process will need to
handle whole file and if the file is huge you'll get OOM, however it
depends on implementation, I'm not sure how it will be done
nevertheless, worse to try the coallesce method(please post your results)

another option would be to use FileUtil.copyMerge which copies each
partition one after another into destination stream(file); so as soon as
you've written your hdfs file with spark with multiple partitions in
parallel(as usual), you can then make another step to merge it into any
destination you want

On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?



Debugging Spark job in Eclipse

2015-08-05 Thread Deepesh Maheshwari
Hi,

As spark job is executed when you run start() method of
JavaStreamingContext.
All the job like map, flatMap is already defined earlier but even though
you put breakpoints in the function ,breakpoint doesn't stop there , then
how can i debug the spark jobs.

JavaDStreamString words=lines.flatMap(new FlatMapFunctionString,
String() {
private static final long serialVersionUID =
-2042174881679341118L;

@Override
public IterableString call(String t) throws Exception {

*// Mark Debug Point here, it doesn't stop here.*

return Lists.newArrayList(SPACE.split(t));
}
});

Please suggest how can i saw the in-between data values.

Regards,
Deepesh


Implementing algorithms in GraphX pregel

2015-08-05 Thread Krish
Hi,
I was recently looking into spark graphx as one of the frameworks that can
help me solve some graph related problems.

The 'think-like-a-vertex' paradigm is something new to me and I cannot wrap
my head over how to implement simple algorithms like Depth First  or
Breadth First or even getting a list of all simple paths between 2 vertices
in graphx.

1. Any clue on how to approach the above problems using graphx pregel.

2. Is there any documentation as to how to send different/unique initial
message to all vertices?

3. How do I send the initial message to only a subset of the vertices?

4. How does 'voting-to-stop' work?

Thanks in advance for any pointers and explanations.


--
κρισhναν


RE: Spark SQL support for Hive 0.14

2015-08-05 Thread Ishwardeep Singh
Thanks Steve and Michael for your response.

Is there a tentative release date for Spark 1.5?

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Tuesday, August 4, 2015 11:53 PM
To: Steve Loughran ste...@hortonworks.com
Cc: Ishwardeep Singh ishwardeep.si...@impetus.co.in; user@spark.apache.org
Subject: Re: Spark SQL support for Hive 0.14

I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has support 
for reading from metastores for Hive 0.12 - 1.2.1

On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran 
ste...@hortonworks.commailto:ste...@hortonworks.com wrote:
Spark 1.3.1  1.4 only support Hive 0.13

Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14 
support entirely and go straight to the currently supported Hive release.

See SPARK-8064 for the gory details

 On 3 Aug 2015, at 23:01, Ishwardeep Singh 
 ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote:

 Hi,

 Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13. Is
 there a way to compile spark with Hive 0.14?

 Currently we are using Spark 1.3.1.

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


Fwd: MLLIB MulticlassMetrics Unable to find class key

2015-08-05 Thread Hayri Volkan Agun
Hi everyone,

After a successfull prediction, MulticlassMetrics returns an error for
unable to find the key for the class for precision.

Is there a way to check whether the metrics contains the class key?

--
Hayri Volkan Agun
PhD. Student - Anadolu University


Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
seems that coallesce do work, see following thread
https://www.mail-archive.com/user%40spark.apache.org/msg00928.html

On 5 August 2015 at 09:47, Igor Berman igor.ber...@gmail.com wrote:

 using coalesce might be dangerous, since 1 worker process will need to
 handle whole file and if the file is huge you'll get OOM, however it
 depends on implementation, I'm not sure how it will be done
 nevertheless, worse to try the coallesce method(please post your results)

 another option would be to use FileUtil.copyMerge which copies each
 partition one after another into destination stream(file); so as soon as
 you've written your hdfs file with spark with multiple partitions in
 parallel(as usual), you can then make another step to merge it into any
 destination you want

 On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?





Spark Streaming - CheckPointing issue

2015-08-05 Thread Sadaf
Hi
i've done the twitter streaming using twitter's streaming user api and spark
streaming. this runs successfully on my local machine. but when i run this
program on cluster in local mode. it just run successfully for the very
first time. later on it gives the following exception.
 
Exception in thread main org.apache.spark.SparkException: Found both
spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.

and spark class path is unset already!!
I have to make a new checkpoint directory each time to make it run
successfully. otherwise it shows above exception.

Can anyone help me to resolve this issue?
Thanks :) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CheckPointing-issue-tp24139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re?? About memory leak in spark 1.4.1

2015-08-05 Thread Sea
No one help me... I help myself, I split the cluster to two cluster 1.4.1 
and 1.3.0




--  --
??: Ted Yu;yuzhih...@gmail.com;
: 2015??8??4??(??) 10:28
??: Igor Bermanigor.ber...@gmail.com; 
: Sea261810...@qq.com; Barak Gitsisbar...@similarweb.com; 
user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com; 
joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; 
: Re: About memory leak in spark 1.4.1



w.r.t. spark.deploy.spreadOut , here is the scaladoc:

  // As a temporary workaround before better ways of configuring memory, we 
allow users to set
  // a flag that will perform round-robin scheduling across the nodes 
(spreading out each app
  // among all the nodes) instead of trying to consolidate each app onto a 
small # of nodes.
  private val spreadOutApps = conf.getBoolean(spark.deploy.spreadOut, true)



Cheers


On Tue, Aug 4, 2015 at 4:13 AM, Igor Berman igor.ber...@gmail.com wrote:
sorry, can't disclose info about my prod cluster
nothing jumps into my mind regarding your config
we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there 
is no documentation regarding this)


If you are sure that you don't have memory leak in your business logic I would 
try to reset each property to default(or just remove it from your config) and 
try to run your job to see if it's not

somehow connected



my config(nothing special really)
spark.shuffle.consolidateFiles true
spark.speculation false

spark.executor.extraJavaOptions -XX:+UseStringCache -XX:+UseCompressedStrings 
-XX:+PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xloggc:gc.log 
-verbose:gc
spark.executor.logs.rolling.maxRetainedFiles 1000
spark.executor.logs.rolling.strategy time
spark.worker.cleanup.enabled true
spark.logConf true
spark.rdd.compress true











On 4 August 2015 at 12:59, Sea 261810...@qq.com wrote:
How much machines are there in your standalone cluster?

I am not using tachyon.


GC can not help me... Can anyone help ?


my configuration:


spark.deploy.spreadOut false
spark.eventLog.enabled true
spark.executor.cores 24


spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.history.retainedApplications 5
spark.deploy.retainedApplications 10
spark.deploy.retainedDrivers  10
spark.streaming.ui.retainedBatches 10
spark.sql.thriftserver.ui.retainedSessions 10
spark.sql.thriftserver.ui.retainedStatements 100



spark.file.transferTo false
spark.driver.maxResultSize 4g
spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*


spark.eventLog.dirhdfs://mycluster/user/spark/historylog
spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog



spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*



spark.sql.parquet.binaryAsString true
spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer 32
spark.kryoserializer.buffer.max 256
spark.shuffle.consolidateFiles true
spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec











--  --
??: Igor Berman;igor.ber...@gmail.com;
: 2015??8??3??(??) 7:56
??: Sea261810...@qq.com; 
: Barak Gitsisbar...@similarweb.com; Ted Yuyuzhih...@gmail.com; 
user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com; 
joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; 
: Re: About memory leak in spark 1.4.1





in general, what is your configuration? use --conf spark.logConf=true



we have 1.4.1 in production standalone cluster and haven't experienced what you 
are describingcan you verify in web-ui that indeed spark got your 50g per 
executor limit? I mean in configuration page..


might be you are using offheap storage(Tachyon)?




On 3 August 2015 at 04:58, Sea 261810...@qq.com wrote:
spark uses a lot more than heap memory, it is the expected behavior.  It 
didn't exist in spark 1.3.x
What does a lot more than means?  It means that I lose control of it!
I try to  apply 31g, but it still grows to 55g and continues to grow!!! That is 
the point!
I have tried set memoryFraction to 0.2??but it didn't help.
I don't know whether it will still exist in the next release 1.5, I wish not.






--  --
??: Barak Gitsis;bar...@similarweb.com;
: 2015??8??2??(??) 9:55
??: Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com; 
: user@spark.apache.orguser@spark.apache.org; 
rxinr...@databricks.com; joshrosenjoshro...@databricks.com; 
daviesdav...@databricks.com; 
: Re: About memory leak in spark 1.4.1





spark uses a lot more than heap memory, it is the expected behavior.in 1.4 
off-heap memory usage is supposed to grow in comparison to 1.3


Better use as little memory as you can for heap, and since you are not 
utilizing it already, it is safe for you to reduce it.
memoryFraction helps 

Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Hayri Volkan Agun
The results in MulticlassMetrics is totally wrong. They are improperly
calculated.
Confusion matrix may be true I don't know but for each label scores are
wrong.

-- 
Hayri Volkan Agun
PhD. Student - Anadolu University


Re: large scheduler delay in pyspark

2015-08-05 Thread gen tang
Hi,
Thanks a lot for your reply.


It seems that it is because of the slowness of the second code.
I rewrite code as list(set([i.items for i in a] + [i.items for i in b])).
The program returns normal.

By the way, I find that when the computation is running, UI will show
scheduler delay. However, it is not scheduler delay. When computation
finishes, UI will show correct scheduler delay time.

Cheers
Gen


On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote:

 On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
  Hi,
 
  Recently, I met some problems about scheduler delay in pyspark. I worked
  several days on this problem, but not success. Therefore, I come to here
 to
  ask for help.
 
  I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to
 merge
  value by adding two list
 
  if I do reduceByKey as follows:
 rdd.reduceByKey(lambda a, b: a+b)
  It works fine, scheduler delay is less than 10s. However if I do
  reduceByKey:
 def f(a, b):
 for i in b:
  if i not in a:
 a.append(i)
 return a
rdd.reduceByKey(f)

 Is it possible that you have large object that is also named `i` or `a` or
 `b`?

 Btw, the second one could be slow than first one, because you try to lookup
 a object in a list, that is O(N), especially when the object is large
 (dict).

  It will cause very large scheduler delay, about 15-20 mins.(The data I
 deal
  with is about 300 mb, and I use 5 machine with 32GB memory)

 If you see scheduler delay, it means there may be a large broadcast
 involved.

  I know the second code is not the same as the first. In fact, my purpose
 is
  to implement the second, but not work. So I try the first one.
  I don't know whether this is related to the data(with long string) or
 Spark
  on Yarn. But the first code works fine on the same data.
 
  Is there any way to find out the log when spark stall in scheduler delay,
  please? Or any ideas about this problem?
 
  Thanks a lot in advance for your help.
 
  Cheers
  Gen
 
 



Re: Spark Streaming - CheckPointing issue

2015-08-05 Thread Saisai Shao
Hi,

What Spark version do you use? it looks like a problem of configuration
recovery, not sure is it a twitter streaming specific problem, I tried
Kafka streaming with checkpoint enabled in my local machine, seems no such
issue. Did you try to set these configurations in somewhere?

Thanks
Saisai

On Wed, Aug 5, 2015 at 8:34 PM, Sadaf sa...@platalytics.com wrote:

 Hi
 i've done the twitter streaming using twitter's streaming user api and
 spark
 streaming. this runs successfully on my local machine. but when i run this
 program on cluster in local mode. it just run successfully for the very
 first time. later on it gives the following exception.

 Exception in thread main org.apache.spark.SparkException: Found both
 spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.

 and spark class path is unset already!!
 I have to make a new checkpoint directory each time to make it run
 successfully. otherwise it shows above exception.

 Can anyone help me to resolve this issue?
 Thanks :)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CheckPointing-issue-tp24139.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Debugging Spark job in Eclipse

2015-08-05 Thread Eugene Morozov
Deepesh, 

you have to call an action to start actual processing.
words.count() would do the trick.


On 05 Aug 2015, at 11:42, Deepesh Maheshwari deepesh.maheshwar...@gmail.com 
wrote:

 Hi,
 
 As spark job is executed when you run start() method of JavaStreamingContext.
 All the job like map, flatMap is already defined earlier but even though you 
 put breakpoints in the function ,breakpoint doesn't stop there , then how can 
 i debug the spark jobs.
 
 JavaDStreamString words=lines.flatMap(new FlatMapFunctionString, String() 
 {
 private static final long serialVersionUID = 
 -2042174881679341118L;
 
 @Override
 public IterableString call(String t) throws Exception {
 
 // Mark Debug Point here, it doesn't stop here.
 
 return Lists.newArrayList(SPACE.split(t));
 }
 });
 
 Please suggest how can i saw the in-between data values.
 
 Regards,
 Deepesh

Eugene Morozov
fathers...@list.ru






Best practices to call hiveContext in DataFrame.foreach in executor program or how to have a for loop in driver program

2015-08-05 Thread unk1102
Hi I have the following code which fires hiveContext.sql() most of the time.
My task is I want to create few table and insert values into after
processing for all hive table partition. So I first fire show partitions and
using its output in a for loop I call few methods which creates table if not
exists and does insert into using hiveContext.sql. Now we cant execute
hiveContext in executor so I have to execute this for loop in driver program
and should run serially one by one. When I submit this Spark job in YARN
cluster almost all the time my executor gets lost because of shuffle not
found exception. Now this is happening because YARN is killing my executor
because of memory overload. I dont understand why I have very less data set
for each hive partition but still it causes YARN to kill my executor. Please
guide why the following code is overkill memory will the following code do
everything in parallel and try to accommodate all hive partition data in
memory at the same time? Please guide I am blocked because of this issue.

 public static void main(String[] args) throws IOException {  
  SparkConf conf = new SparkConf();
  SparkContext sc = new SparkContext(conf);
  HiveContext hc = new HiveContext(sc);

 DataFrame partitionFrame = hiveContext.sql( show partitions dbdata
partition(date=2015-08-05));
 
 Row[] rowArr = partitionFrame.collect();
 for(Row row : rowArr) {
  String[] splitArr = row.getString(0).split(/);
  String server = splitArr[0].split(=)[1];
  String date =  splitArr[1].split(=)[1];
  String csvPath = hdfs:///user/db/ext/+server+.csv;
  if(fs.exists(new Path(csvPath))) {
   hiveContext.sql(ADD FILE  + csvPath);
  }
  createInsertIntoTableABC(hc,entity, date);
  createInsertIntoTableDEF(hc,entity, date);
  createInsertIntoTableGHI(hc,entity,date);
  createInsertIntoTableJKL(hc,entity, date);
  createInsertIntoTableMNO(hc,entity,date);
   }

}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-to-call-hiveContext-in-DataFrame-foreach-in-executor-program-or-how-to-have-a-for-loom-tp24141.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL Hive - merge small files

2015-08-05 Thread Brandon White
Hello,

I would love to have hive merge the small files in my managed hive context
after every query. Right now, I am setting the hive configuration in my
Spark Job configuration but hive is not managing the files. Do I need to
set the hive fields in around place? How do you set Hive configurations in
Spark?

Here is what I'd like to set

hive.merge.mapfilestrue
hive.merge.mapredfilestrue
hive.merge.size.per.task25600
hive.merge.smallfiles.avgsize1600


Memory allocation error with Spark 1.5

2015-08-05 Thread Alexis Seigneurin
Hi,

I'm receiving a memory allocation error with a recent build of Spark 1.5:

java.io.IOException: Unable to acquire 67108864 bytes of memory
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
at
org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)


The issue appears when joining 2 datasets. One with 6084 records, the other
one with 200 records. I'm expecting to receive 200 records in the result.

I'm using a homemade build prepared from branch-1.5 with commit ID
eedb996. I have run mvn -DskipTests clean install to generate that
build.

Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.

I've prepared a test case that can be built and executed very easily (data
files are included in the repo):
https://github.com/aseigneurin/spark-testcase

One thing to note is that the issue arises when the master is set to
local[*] but not when set to local. Both options work without problem
with Spark 1.4, though.

Any help will be greatly appreciated!

Many thanks,
Alexis


Re: No Twitter Input from Kafka to Spark Streaming

2015-08-05 Thread narendra
Thanks Akash for the answer. I added endpoint to the listener and now it is
working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL support for Hive 0.14

2015-08-05 Thread Steve Loughran

On 5 Aug 2015, at 02:08, Ishwardeep Singh 
ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote:

Thanks Steve and Michael for your response.

Is there a tentative release date for Spark 1.5?

The branch is of and going through its bug stabilisation/test phase.

This is where you can help contribute to the project by downloading and playing 
with the beta releases when they become available. This will give you early 
access to the code —but most of all, timely fixes to any problems that surface. 
find a bug in a beta and it should hopefully be fixed within the month. Find a 
bug in a production release and you can wait a lot longer.

This probably matters even more when it comes to that Hive support, because its 
a big change, including changes to the SQL syntax to track HQL's changes. (More 
succinctly: some of the regression tests broke as things like `date` and 
`double` were now types, and UNION had evolved.)


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Tuesday, August 4, 2015 11:53 PM
To: Steve Loughran ste...@hortonworks.commailto:ste...@hortonworks.com
Cc: Ishwardeep Singh 
ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in; 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark SQL support for Hive 0.14

I'll add that while Spark SQL 1.5 compiles against Hive 1.2.1, it has support 
for reading from metastores for Hive 0.12 - 1.2.1

On Tue, Aug 4, 2015 at 9:59 AM, Steve Loughran 
ste...@hortonworks.commailto:ste...@hortonworks.com wrote:
Spark 1.3.1  1.4 only support Hive 0.13

Spark 1.5 is going to be released against Hive 1.2.1; it'll skip Hive .14 
support entirely and go straight to the currently supported Hive release.

See SPARK-8064 for the gory details

 On 3 Aug 2015, at 23:01, Ishwardeep Singh 
 ishwardeep.si...@impetus.co.inmailto:ishwardeep.si...@impetus.co.in wrote:

 Hi,

 Does spark SQL support Hive 0.14? The documentation refers to Hive 0.13. Is
 there a way to compile spark with Hive 0.14?

 Currently we are using Spark 1.3.1.

 Thanks



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-support-for-Hive-0-14-tp24122.html
 Sent from the Apache Spark User List mailing list archive at 
 Nabble.comhttp://Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org









NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.



Re: Unable to load native-hadoop library for your platform

2015-08-05 Thread Steve Loughran

 On 4 Aug 2015, at 12:26, Sean Owen so...@cloudera.com wrote:
 
 Oh good point, does the Windows integration need native libs for
 POSIX-y file system access? I know there are some binaries shipped for
 this purpose but wasn't sure if that's part of what's covered in the
 native libs message.

On unix its a warning that you'll get slower access to bits of the system and 
no posix-ish support, on windows its more serious (and should be reported more 
seriously)


It's used for various low-level things, and currently Hadoop fails somewhat 
messily when its not there. That's even for things like MiniYarnCluster, which 
is a real annoyance.

I have a snapshot of the Hadoop 2.6 windows binaries if people want them; I 
should stick up a copy of the 2.7.1 ones too
https://github.com/steveloughran/clusterconfigs/tree/master/clusters/morzine/hadoop_home/


 
 On Tue, Aug 4, 2015 at 6:01 PM, Steve Loughran ste...@hortonworks.com wrote:
 Think it may be needed on Windows, certainly if you start trying to work 
 with local files.
 
 
 On 4 Aug 2015, at 00:34, Sean Owen so...@cloudera.com wrote:
 
 It won't affect you if you're not actually running Hadoop. But it's
 mainly things like Snappy/LZO compression which are implemented as
 native libraries under the hood.
 
 There's a lot more in those native libs, primarily to bypass bits missing 
 from the java APIs (FS permissions) and to add new features (encryption, 
 soon erasure coding).
 
 The Hadoop file:// FS uses it on windows, at least for now
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
Hi Hayri,

Can you provide a sample of the expected and actual results?

Feynman

On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University



Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
Also, what version of Spark are you using?

On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com wrote:

 Hi Hayri,

 Can you provide a sample of the expected and actual results?

 Feynman

 On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
 wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University





Re: Spark SQL Hive - merge small files

2015-08-05 Thread Michael Armbrust
This feature isn't currently supported.

On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com
wrote:

 Hello,

 I would love to have hive merge the small files in my managed hive context
 after every query. Right now, I am setting the hive configuration in my
 Spark Job configuration but hive is not managing the files. Do I need to
 set the hive fields in around place? How do you set Hive configurations in
 Spark?

 Here is what I'd like to set

 hive.merge.mapfilestrue
 hive.merge.mapredfilestrue
 hive.merge.size.per.task25600
 hive.merge.smallfiles.avgsize1600



Re: Memory allocation error with Spark 1.5

2015-08-05 Thread Reynold Xin
In Spark 1.5, we have a new way to manage memory (part of Project
Tungsten). The default unit of memory allocation is 64MB, which is way too
high when you have 1G of memory allocated in total and have more than 4
threads.

We will reduce the default page size before releasing 1.5.  For now, you
can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m).

https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125

On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin aseigneu...@ippon.fr
wrote:

 Hi,

 I'm receiving a memory allocation error with a recent build of Spark 1.5:

 java.io.IOException: Unable to acquire 67108864 bytes of memory
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)


 The issue appears when joining 2 datasets. One with 6084 records, the
 other one with 200 records. I'm expecting to receive 200 records in the
 result.

 I'm using a homemade build prepared from branch-1.5 with commit ID
 eedb996. I have run mvn -DskipTests clean install to generate that
 build.

 Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.

 I've prepared a test case that can be built and executed very easily (data
 files are included in the repo):
 https://github.com/aseigneurin/spark-testcase

 One thing to note is that the issue arises when the master is set to
 local[*] but not when set to local. Both options work without problem
 with Spark 1.4, though.

 Any help will be greatly appreciated!

 Many thanks,
 Alexis



Re: Spark SQL Hive - merge small files

2015-08-05 Thread Brandon White
So there is no good way to merge spark files in a manage hive table right
now?

On Wed, Aug 5, 2015 at 10:02 AM, Michael Armbrust mich...@databricks.com
wrote:

 This feature isn't currently supported.

 On Wed, Aug 5, 2015 at 8:43 AM, Brandon White bwwintheho...@gmail.com
 wrote:

 Hello,

 I would love to have hive merge the small files in my managed hive
 context after every query. Right now, I am setting the hive configuration
 in my Spark Job configuration but hive is not managing the files. Do I need
 to set the hive fields in around place? How do you set Hive configurations
 in Spark?

 Here is what I'd like to set

 hive.merge.mapfilestrue
 hive.merge.mapredfilestrue
 hive.merge.size.per.task25600
 hive.merge.smallfiles.avgsize1600





Re: Is SPARK-3322 fixed in latest version of Spark?

2015-08-05 Thread Jim Green
We tested Spark 1.2 and 1.3 , and this issue is gone. I know starting from
1.2, Spark uses netty instead of nio.
So you mean that bypass this issue?

Another question is , why this error message did not show in Spark 0.9 or
older version?

On Tue, Aug 4, 2015 at 11:01 PM, Aaron Davidson ilike...@gmail.com wrote:

 ConnectionManager has been deprecated and is no longer used by default
 (NettyBlockTransferService is the replacement). Hopefully you would no
 longer see these messages unless you have explicitly flipped it back on.

 On Tue, Aug 4, 2015 at 6:14 PM, Jim Green openkbi...@gmail.com wrote:

 And also https://issues.apache.org/jira/browse/SPARK-3106
 This one is still open.

 On Tue, Aug 4, 2015 at 6:12 PM, Jim Green openkbi...@gmail.com wrote:

 *Symotom:*
 Even sample job fails:
 $ MASTER=spark://xxx:7077 run-example org.apache.spark.examples.SparkPi
 10
 Pi is roughly 3.140636
 ERROR ConnectionManager: Corresponding SendingConnection to
 ConnectionManagerId(xxx,) not found
 WARN ConnectionManager: All connections not cleaned up

 Found https://issues.apache.org/jira/browse/SPARK-3322
 But the code changes are not in newer version os Spark, however this
 jira is marked as fixed.
 Is this issue really fixed in latest version? If so, what is the related
 JIRA?

 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)




 --
 Thanks,
 www.openkb.info
 (Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)





-- 
Thanks,
www.openkb.info
(Open KnowledgeBase for Hadoop/Database/OS/Network/Tool)


Streaming and calculated-once semantics

2015-08-05 Thread Dimitris Kouzis - Loukas
Hello, here's a simple program that demonstrates my problem:


ssc = StreamingContext(sc, 1)

input = [ [(k1,12), (k2,14)], [(k1,22)] ]

rawData = ssc.queueStream([sc.parallelize(d, 1) for d in input])

runningRawData = rawData.updateStateByKey(lambda nv, prev: reduce(sum, nv,
prev or 0))

def stats(rdd) {
keyavg = rdd.values().reduce(sum) / rdd.count()
return rdd.mapValues(lambda i: i - keyavg)
}

runningRawData.transform(stats).print()


I have a feeling this will calculate keyavg = rdd.values().reduce(sum) /
rdd.count() inside stats quite a few times depending on the number of
partitions on the current rdd.

What would be an alternative way to do this two step computation without
calculating the average many times?


Starting Spark SQL thrift server from within a streaming app

2015-08-05 Thread Daniel Haviv
Hi,
Is it possible to start the Spark SQL thrift server from with a streaming app 
so the streamed data could be queried as it's goes in ?

Thank you.
Daniel

Spark MLib v/s SparkR

2015-08-05 Thread praveen S
I was wondering when one should go for MLib or SparkR. What is the criteria
or what should be considered before choosing either of the solutions for
data analysis?
or What is the advantages of Spark MLib over Spark R or advantages of
SparkR over MLib?


Re: Spark MLib v/s SparkR

2015-08-05 Thread Benamara, Ali
SparkR doesn't support all the ML algorithms yet, the next 1.5 will have more 
support but still not all the algorithms that are currently supported in Mllib. 
SparkR is more of a convenience for R users to get acquainted with Spark at 
this point.
 -Ali

From: praveen S mylogi...@gmail.commailto:mylogi...@gmail.com
Date: Wednesday, August 5, 2015 at 2:24 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Spark MLib v/s SparkR

I was wondering when one should go for MLib or SparkR. What is the criteria or 
what should be considered before choosing either of the solutions for data 
analysis?
or What is the advantages of Spark MLib over Spark R or advantages of SparkR 
over MLib?


Re: Spark MLib v/s SparkR

2015-08-05 Thread Krishna Sankar
A few points to consider:
a) SparkR gives the union of R_in_a_single_machine and the
distributed_computing_of_Spark:
b) It also gives the ability to wrangle with data in R, that is in the
Spark eco system
c) Coming to MLlib, the question is MLlib and R (not MLlib or R) -
depending on the scale, data location et al
d) As Ali mentioned, some of the MLlib might not be supported in R (I
haven't looked at it that carefully, but can be resolved by the APIs),
OTOH, 1.5 is on it's way.
e) So it all depends on the algorithms that one wants to use and whether
one needs R for pre or post processing
HTH.
Cheers
k/

On Wed, Aug 5, 2015 at 11:24 AM, praveen S mylogi...@gmail.com wrote:

 I was wondering when one should go for MLib or SparkR. What is the
 criteria or what should be considered before choosing either of the
 solutions for data analysis?
 or What is the advantages of Spark MLib over Spark R or advantages of
 SparkR over MLib?



spark hangs at broadcasting during a filter

2015-08-05 Thread AlexG
I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
of the rows of A are relevant, so the following code first loads the
triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets
whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
Double]]] for each row (if I'm judging datatypes correctly).

val valsrows = sc.textFile(valsinpath).map(_.split(,)).
  map(x = (x(1).toInt, (x(0).toInt,
x(2).toDouble))).
  filter(x = !droprows.contains(x._1)).
  groupByKey.
  map(x = (x._1, x._2.toSeq.sortBy(_._1)))

Spark hangs during a broadcast that occurs during the filter step (according
to the Spark UI). The last two lines in the log before it pauses are:

5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

I've left Spark running for up to 17 minutes one time, and it never
continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
instances (244Gb, 32 cores) with spark in standalone mode with 220G executor
and driver memory, and using the kyroserializer.

Any ideas on what could be causing this hang?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark MLib v/s SparkR

2015-08-05 Thread Charles Earl
What machine learning algorithms are you interested in exploring or using?
Start from there or better yet the problem you are trying to solve, and
then the selection may be evident.


On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote:

 I was wondering when one should go for MLib or SparkR. What is the
 criteria or what should be considered before choosing either of the
 solutions for data analysis?
 or What is the advantages of Spark MLib over Spark R or advantages of
 SparkR over MLib?



-- 
- Charles


Set Job Descriptions for Scala application

2015-08-05 Thread Rares Vernica
Hello,

My Spark application is written in Scala and submitted to a Spark cluster
in standalone mode. The Spark Jobs for my application are listed in the
Spark UI like this:

Job Id Description ...
6  saveAsTextFile at Foo.scala:202
5  saveAsTextFile at Foo.scala:201
4  count at Foo.scala:188
3  collect at Foo.scala:182
2  count at Foo.scala:162
1  count at Foo.scala:152
0  collect at Foo.scala:142


Is it possible to assign Job Descriptions to all these jobs in my Scala
code?

Thanks!
Rares


Re: Set Job Descriptions for Scala application

2015-08-05 Thread Mark Hamstra
SparkContext#setJobDescription or SparkContext#setJobGroup

On Wed, Aug 5, 2015 at 12:29 PM, Rares Vernica rvern...@gmail.com wrote:

 Hello,

 My Spark application is written in Scala and submitted to a Spark cluster
 in standalone mode. The Spark Jobs for my application are listed in the
 Spark UI like this:

 Job Id Description ...
 6  saveAsTextFile at Foo.scala:202
 5  saveAsTextFile at Foo.scala:201
 4  count at Foo.scala:188
 3  collect at Foo.scala:182
 2  count at Foo.scala:162
 1  count at Foo.scala:152
 0  collect at Foo.scala:142


 Is it possible to assign Job Descriptions to all these jobs in my Scala
 code?

 Thanks!
 Rares















































Re: spark hangs at broadcasting during a filter

2015-08-05 Thread Philip Weaver
How big is droprows?

Try explicitly broadcasting it like this:

val broadcastDropRows = sc.broadcast(dropRows)

val valsrows = ...
.filter(x = !broadcastDropRows.value.contains(x._1))

- Philip


On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote:

 I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
 matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
 of the rows of A are relevant, so the following code first loads the
 triplets are text, splits them into Tuple3[Int, Int, Double], drops
 triplets
 whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
 Double]]] for each row (if I'm judging datatypes correctly).

 val valsrows = sc.textFile(valsinpath).map(_.split(,)).
   map(x = (x(1).toInt, (x(0).toInt,
 x(2).toDouble))).
   filter(x = !droprows.contains(x._1)).
   groupByKey.
   map(x = (x._1, x._2.toSeq.sortBy(_._1)))

 Spark hangs during a broadcast that occurs during the filter step
 (according
 to the Spark UI). The last two lines in the log before it pauses are:

 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
 memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

 I've left Spark running for up to 17 minutes one time, and it never
 continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
 instances (244Gb, 32 cores) with spark in standalone mode with 220G
 executor
 and driver memory, and using the kyroserializer.

 Any ideas on what could be causing this hang?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




HiveContext error

2015-08-05 Thread Stefan Panayotov
Hello,
 
I am trying to define an external Hive table from Spark HiveContext like the 
following:
 
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
 
hiveCtx.sql(sCREATE EXTERNAL TABLE IF NOT EXISTS Rentrak_Ratings (Version 
string, Gen_Date string, Market_Number string, Market_Name string, Time_Zone 
string, Number_Households string,
 | DateTime string, Program_Start_Time string, Program_End_Time string, Station 
string, Station_Name string, Call_Sign string, Network_Name string, Program 
string,
 | Series_Name string, Series_Number string, Episode_Number string, 
Episode_Title string, Demographic string, Demographic_Name string, HHUniverse 
string,
 | Share_15min_Segment string, PHUT_15min_Segment string, Rating_15min_Segment 
string, AV_Audience_15min_Segment string)
 | PARTITIONED BY (year INT, month INT)
 | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'.stripMargin)

And I am getting the following error:
 
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal 
Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook)

at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:324)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:292)

at 
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)

at 
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)

at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147)

at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130)

at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)

at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:103)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:37)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41)

at $iwC$$iwC$$iwC$$iwC.init(console:43)

at $iwC$$iwC$$iwC.init(console:45)

at $iwC$$iwC.init(console:47)

at $iwC.init(console:49)

at init(console:51)

at .init(console:55)

at .clinit(console)

at .init(console:7)

at .clinit(console)

 
Can anybody help please?


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
1.5 has not yet been released; what is the commit hash that you are
building?

On Wed, Aug 5, 2015 at 10:29 AM, Hayri Volkan Agun volkana...@gmail.com
wrote:

 Hi,

 In Spark 1.5 I saw a result for precision 1.0 and recall 0.01 for decision
 tree classification.
 While precision a hundred the recall shouldn't be so small...I checked the
 code, everything seems ok
 but why I got such a result is unexplainable. As far as I understand from
 scala code the row sum is the actual
 class counts, the column sum is predictions sum am I right?
 I am doing additional tests for comparison with my own code...
 I attached a document for my reuters tests on page 3.


 On Wed, Aug 5, 2015 at 7:57 PM, Feynman Liang fli...@databricks.com
 wrote:

 Also, what version of Spark are you using?

 On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com
 wrote:

 Hi Hayri,

 Can you provide a sample of the expected and actual results?

 Feynman

 On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
 wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University






 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University



SparkConf ignoring keys

2015-08-05 Thread Corey Nolet
I've been using SparkConf on my project for quite some time now to store
configuration information for its various components. This has worked very
well thus far in situations where I have control over the creation of the
SparkContext  the SparkConf.

I have run into a bit of a problem trying to integrate this same approach
to the use of the shell, however. I have a bunch of properties in a
properties file that are shared across several different types of
applications (web containers, etc...) but the SparkConf ignores these
properties because they aren't prefixed with spark.*

Is this really necessary? It's not really stopping people from adding their
own properties and it limits the power of being able to utilize one central
configuration object.


Re: large scheduler delay in pyspark

2015-08-05 Thread ayan guha
It seems you want to dedupe your data after the merge so set(a+b) should
also work..you may ditch the list comprehensiion operation.
On 5 Aug 2015 23:55, gen tang gen.tan...@gmail.com wrote:

 Hi,
 Thanks a lot for your reply.


 It seems that it is because of the slowness of the second code.
 I rewrite code as list(set([i.items for i in a] + [i.items for i in b])).
 The program returns normal.

 By the way, I find that when the computation is running, UI will show
 scheduler delay. However, it is not scheduler delay. When computation
 finishes, UI will show correct scheduler delay time.

 Cheers
 Gen


 On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote:

 On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
  Hi,
 
  Recently, I met some problems about scheduler delay in pyspark. I worked
  several days on this problem, but not success. Therefore, I come to
 here to
  ask for help.
 
  I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to
 merge
  value by adding two list
 
  if I do reduceByKey as follows:
 rdd.reduceByKey(lambda a, b: a+b)
  It works fine, scheduler delay is less than 10s. However if I do
  reduceByKey:
 def f(a, b):
 for i in b:
  if i not in a:
 a.append(i)
 return a
rdd.reduceByKey(f)

 Is it possible that you have large object that is also named `i` or `a`
 or `b`?

 Btw, the second one could be slow than first one, because you try to
 lookup
 a object in a list, that is O(N), especially when the object is large
 (dict).

  It will cause very large scheduler delay, about 15-20 mins.(The data I
 deal
  with is about 300 mb, and I use 5 machine with 32GB memory)

 If you see scheduler delay, it means there may be a large broadcast
 involved.

  I know the second code is not the same as the first. In fact, my
 purpose is
  to implement the second, but not work. So I try the first one.
  I don't know whether this is related to the data(with long string) or
 Spark
  on Yarn. But the first code works fine on the same data.
 
  Is there any way to find out the log when spark stall in scheduler
 delay,
  please? Or any ideas about this problem?
 
  Thanks a lot in advance for your help.
 
  Cheers
  Gen
 
 





How to read gzip data in Spark - Simple question

2015-08-05 Thread ๏̯͡๏
I have csv data that is embedded in gzip format on HDFS.

*With Pig*

a = load
'/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
PigStorage();

b = limit a 10

(2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)

(2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


However with Spark

val rowStructText =
sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

val x = rowStructText.map(s = {

println(s)

s}

)

x.count

Questions

1) x.count always shows 67 irrespective of the path i change in
sc.parallelize

2) It shows x as RDD[Char] instead of String

3) println() never emits the rows.

Any suggestions

-Deepak



-- 
Deepak


Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
This message means that java.util.Date is not supported by Spark DataFrame.
You'll need to use java.sql.Date, I believe.

On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 That seem to be working. however i see a new exception

 Code:
 def formatStringAsDate(dateStr: String) = new
 SimpleDateFormat(-MM-dd).parse(dateStr)


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
 f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
 f12: Integer, f13: Integer, f14: String)

 val summary  = rowStructText.map(s = s.split(,)).map(
 s = Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 ).toDF()
 bank.registerTempTable(summary)


 //Output
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
 MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
 org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
 console:61 java.lang.UnsupportedOperationException: Schema for type
 java.util.Date is not supported at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


 Any suggestions

 On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak





 --
 Deepak




spark on mesos with docker from private repository

2015-08-05 Thread Eyal Fink
Hi,
My Spark set up is a cluster on top of mesos using docker containers.
I want to pull the docker images from a private repository (currently gcr.io
),
and I can't get the authentication to work.

I know how to generate a .dockercfg file  (running on GCE, using gcloud
docker -a).
My problem is that as far as I understand I need this file in the root
directory of the executor dir and I can't find a way to make spark executor
to pull this file (not without changing spark code).

Am I missing something?
It seems that spark do support mesos+docker so I wonder what other people
with this setup are doing?


Thanks
Eyal


Re: Reliable Streaming Receiver

2015-08-05 Thread Sourabh Chandak
Thanks Tathagata. I tried that but BlockGenerator internally uses
SystemClock which is again private.

We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
version. Is it possible to use the same code as a separate API with 1.2?

Thanks,
Sourabh

On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote:

  You could very easily strip out the BlockGenerator code from the Spark
 source code and use it directly in the same way the Reliable Kafka Receiver
 uses it. BTW, you should know that we will be deprecating the receiver
 based approach for the Direct Kafka approach. That is quite flexible, can
 give exactly-once guarantee without WAL, and is more robust and performant.
 Consider using it.


 On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com
 wrote:

 Hi,

 I am trying to replicate the Kafka Streaming Receiver for a custom
 version of Kafka and want to create a Reliable receiver. The current
 implementation uses BlockGenerator which is a private class inside Spark
 streaming hence I can't use that in my code. Can someone help me with some
 resources to tackle this issue?



 Thanks,
 Sourabh





Re: Newbie question: can shuffle avoid writing and reading from disk?

2015-08-05 Thread Muler
Thanks!

On Wed, Aug 5, 2015 at 5:24 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Yes, finally shuffle data will be written to disk for reduce stage to
 pull, no matter how large you set to shuffle memory fraction.

 Thanks
 Saisai

 On Thu, Aug 6, 2015 at 7:50 AM, Muler mulugeta.abe...@gmail.com wrote:

 thanks, so if I have enough large memory (with enough
 spark.shuffle.memory) then shuffle (in-memory shuffle) spill doesn't happen
 (per node) but still shuffle data has to be ultimately written to disk so
 that reduce stage pulls if across network?

 On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Muler,

 Shuffle data will be written to disk, no matter how large memory you
 have, large memory could alleviate shuffle spill where temporary file will
 be generated if memory is not enough.

 Yes, each node writes shuffle data to file and pulled from disk in
 reduce stage from network framework (default is Netty).

 Thanks
 Saisai

 On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote:

 Hi,

 Consider I'm running WordCount with 100m of data on 4 node cluster.
 Assuming my RAM size on each node is 200g and i'm giving my executors 100g
 (just enough memory for 100m data)


1. If I have enough memory, can Spark 100% avoid writing to disk?
2. During shuffle, where results have to be collected from nodes,
does each node write to disk and then the results are pulled from disk? 
 If
not, what is the API that is being used to pull data from nodes across 
 the
cluster? (I'm thinking what Scala or Java packages would allow you to 
 read
in-memory data from other machines?)

 Thanks,







Re: How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?

2015-08-05 Thread Ted Yu
Please see the comments at the tail of SPARK-2356

Cheers

On Wed, Aug 5, 2015 at 6:04 PM, Ashish Dutt ashish.du...@gmail.com wrote:

 *Use Case:* To automate the process of data extraction (HDFS), data
 analysis (pySpark/sparkR) and saving the data back to HDFS
 programmatically.

 *Prospective solutions:*

 1. Create a remote server connectivity program in an IDE like pyCharm or
 RStudio and use it to retrieve the data from HDFS or else
 2. Create the data retrieval code in python or R and then point the IDE to
 the remote server using TCP.

 *Problem:* How to achieve either of the prospective solution 1 or 2
 defined above? Do you have any better solution then these, if yes please
 share?

 *What have I tried so far?*

 The server and 3 namenodes already installed with pyspark and I have
 checked pyspark works in standalone mode on all four servers. Pyspark works
 in standalone mode on my laptop too.

 I use the following code but I am not able to connect to the remote server.

 import os
 import sys
 try:
 from pyspark import SparkContext
 from pyspark import SparkConf
 print (Pyspark sucess)
 except ImportError as e:
 print (Error importing Spark Modules, e)

 conf = SparkConf()
 conf.setMaster(spark://10.210.250.400:7077)
 conf.setAppName(First_Remote_Spark_Program)
 sc = SparkContext(conf=conf)
 print (connection succeeded with Master,conf)
 data = [1, 2, 3, 4, 5]
 distData = sc.parallelize(data)
 print(distData)

 The stack trace of error is

 Pyspark sucess
 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library 
 for your platform... using builtin-java classes where applicable
 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the 
 hadoop binary path
 java.io.IOException: Could not locate executable null\bin\winutils.exe in the 
 Hadoop binaries.
 at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
 at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
 at org.apache.hadoop.util.Shell.clinit(Shell.java:326)
 at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
 at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
 at org.apache.hadoop.security.Groups.init(Groups.java:77)
 at 
 org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
 at 
 org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
 at 
 org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
 at 
 org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
 at 
 org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
 at 
 org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
 at 
 org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
 at 
 org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
 at org.apache.spark.SparkContext.init(SparkContext.scala:301)
 at 
 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at 
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:214)
 at 
 py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
 at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
 at py4j.GatewayConnection.run(GatewayConnection.java:207)
 at java.lang.Thread.run(Thread.java:745)
 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. 
 Attempting port 4041.
 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to 
 akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: 
 Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077
 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote 
 address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address is now gated 
 for 5000 ms, all messages to this address will be delivered to dead letters. 
 Reason: Connection refused: no further information: /10.210.250.400:7077
 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to 
 akka.tcp://sparkMaster@10.210.250.400:7077: akka.remote.InvalidAssociation: 
 Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077
 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote 
 address 

RE: How to read gzip data in Spark - Simple question

2015-08-05 Thread Ganelin, Ilya
Have you tried reading the spark documentation?

http://spark.apache.org/docs/latest/programming-guide.html



Thank you,
Ilya Ganelin



-Original Message-
From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com]
Sent: Thursday, August 06, 2015 12:41 AM Eastern Standard Time
To: Philip Weaver
Cc: user
Subject: Re: How to read gzip data in Spark - Simple question

how do i persist the RDD to HDFS ?

On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
This message means that java.util.Date is not supported by Spark DataFrame. 
You'll need to use java.sql.Date, I believe.

On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
That seem to be working. however i see a new exception

Code:
def formatStringAsDate(dateStr: String) = new 
SimpleDateFormat(-MM-dd).parse(dateStr)

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
val rowStructText = 
sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: 
Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: 
Integer, f13: Integer, f14: String)

val summary  = rowStructText.map(s = s.split(,)).map(
s = Summary(formatStringAsDate(s(0)),
s(1).replaceAll(\, ).toLong,
s(3).replaceAll(\, ).toLong,
s(4).replaceAll(\, ).toInt,
s(5).replaceAll(\, ),
s(6).replaceAll(\, ).toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll(\, ).toInt,
s(10).replaceAll(\, ).toInt,
s(11).replaceAll(\, ).toFloat,
s(12).replaceAll(\, ).toInt,
s(13).replaceAll(\, ).toInt,
s(14).replaceAll(\, )
)
).toDF()
bank.registerTempTable(summary)


//Output
import java.text.SimpleDateFormat import java.util.Calendar import 
java.util.Date formatStringAsDate: (dateStr: String)java.util.Date 
rowStructText: org.apache.spark.rdd.RDD[String] = 
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz 
MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: 
org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 
java.lang.UnsupportedOperationException: Schema for type java.util.Date is not 
supported at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)

Any suggestions

On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
The parallelize method does not read the contents of a file. It simply takes a 
collection and distributes it to the cluster. In this case, the String is a 
collection 67 characters.

Use sc.textFile instead of sc.parallelize, and it should work as you want.

On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
I have csv data that is embedded in gzip format on HDFS.

With Pig

a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' 
using PigStorage();

b = limit a 10

(2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)

(2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


However with Spark

val rowStructText = 
sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

val x = rowStructText.map(s = {

println(s)

s}

)

x.count

Questions

1) x.count always shows 67 irrespective of the path i change in sc.parallelize

2) It shows x as RDD[Char] instead of String

3) println() never emits the rows.

Any suggestions

-Deepak


--
Deepak





--
Deepak





--
Deepak



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two keys,
 e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip






Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Cheng Lian

We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case? We've 
observed ~50x performance boost with schema merging turned on.


Cheng

On 8/6/15 8:26 AM, Philip Weaver wrote:
I have a parquet directory that was produced by partitioning by two 
keys, e.g. like this:


df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b for each 
value of a, for a total of over 40,000 partitions.


Before running any transformations or actions on the DataFrame, just 
initializing it like this takes *2 minutes*:


val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping to 
discover all the partitions? Is it perhaps having to merge the schema 
from each partition? Would you expect it to get better or worse if I 
subpartition by another key?


- Philip






Re: Reliable Streaming Receiver

2015-08-05 Thread Dibyendu Bhattacharya
Hi,

You can try This Kafka Consumer for Spark which is also part of Spark
Packages . https://github.com/dibbhatt/kafka-spark-consumer

Regards,
Dibyendu

On Thu, Aug 6, 2015 at 6:48 AM, Sourabh Chandak sourabh3...@gmail.com
wrote:

 Thanks Tathagata. I tried that but BlockGenerator internally uses
 SystemClock which is again private.

 We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
 version. Is it possible to use the same code as a separate API with 1.2?

 Thanks,
 Sourabh

 On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das t...@databricks.com wrote:

  You could very easily strip out the BlockGenerator code from the Spark
 source code and use it directly in the same way the Reliable Kafka Receiver
 uses it. BTW, you should know that we will be deprecating the receiver
 based approach for the Direct Kafka approach. That is quite flexible, can
 give exactly-once guarantee without WAL, and is more robust and performant.
 Consider using it.


 On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com
 wrote:

 Hi,

 I am trying to replicate the Kafka Streaming Receiver for a custom
 version of Kafka and want to create a Reliable receiver. The current
 implementation uses BlockGenerator which is a private class inside Spark
 streaming hence I can't use that in my code. Can someone help me with some
 resources to tackle this issue?



 Thanks,
 Sourabh






Re: Upgrade of Spark-Streaming application

2015-08-05 Thread Shushant Arora
Hi

For checkpointing and using fromOffsets  arguments- Say for the first time
when my app starts I don't have any prev state stored and I want to start
consuming from largest offset

1.  is it possible to specify that in fromOffsets api- I don't want to use
another api which returs JavaPairInputDStream but fromoffsets api
returns JavaDStream - since I want to keep further flow of my app same in
both case.


2. So to achieve first(same flow in both cases) if I  use diff api in 2
cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
function , I am no longer able to typecast transferred stream to
HasOffsetRanges for getting offstes of current run- it throws class cast
exception -
when i do
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
on transformed stream -

java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges




On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger c...@koeninger.org wrote:

 You can't use checkpoints across code upgrades.  That may or may not
 change in the future, but for now that's a limitation of spark checkpoints
 (regardless of whether you're using Kafka).

 Some options:

 - Start up the new job on a different cluster, then kill the old job once
 it's caught up to where the new job started.  If you care about duplicate
 work, you should be doing idempotent / transactional writes anyway, which
 should take care of the overlap between the two.  If you're doing batches,
 you may need to be a little more careful about handling batch boundaries

 - Store the offsets somewhere other than the checkpoint, and provide them
 on startup using the fromOffsets argument to createDirectStream





 On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro nibbi...@gmail.com
 wrote:

 Hi,
 I've read about the recent updates about spark-streaming integration with
 Kafka (I refer to the new approach without receivers).
 In the new approach, metadata are persisted in checkpoint folders on HDFS
 so that the SparkStreaming context can be recreated in case of failures.
 This means that the streaming application will restart from the where it
 exited and the message consuming process continues with new messages only.
 Also, if I manually stop the streaming process and recreate the context
 from checkpoint (using an approach similar to
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
 the behavior would be the same.

 Now, suppose I want to change something in the software and modify the
 processing pipeline.
 Can spark use the previous checkpoint to recreate the new application?
 Will I ever be able to upgrade the software without processing all the
 messages in Kafka again?

 Regards,
 Nicola





Re: Reliable Streaming Receiver

2015-08-05 Thread Tathagata Das
 You could very easily strip out the BlockGenerator code from the Spark
source code and use it directly in the same way the Reliable Kafka Receiver
uses it. BTW, you should know that we will be deprecating the receiver
based approach for the Direct Kafka approach. That is quite flexible, can
give exactly-once guarantee without WAL, and is more robust and performant.
Consider using it.


On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak sourabh3...@gmail.com
wrote:

 Hi,

 I am trying to replicate the Kafka Streaming Receiver for a custom version
 of Kafka and want to create a Reliable receiver. The current implementation
 uses BlockGenerator which is a private class inside Spark streaming hence I
 can't use that in my code. Can someone help me with some resources to
 tackle this issue?



 Thanks,
 Sourabh



Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
The parallelize method does not read the contents of a file. It simply
takes a collection and distributes it to the cluster. In this case, the
String is a collection 67 characters.

Use sc.textFile instead of sc.parallelize, and it should work as you want.

On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak




Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread ๏̯͡๏
That seem to be working. however i see a new exception

Code:
def formatStringAsDate(dateStr: String) = new
SimpleDateFormat(-MM-dd).parse(dateStr)

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
val rowStructText =
sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
f12: Integer, f13: Integer, f14: String)

val summary  = rowStructText.map(s = s.split(,)).map(
s = Summary(formatStringAsDate(s(0)),
s(1).replaceAll(\, ).toLong,
s(3).replaceAll(\, ).toLong,
s(4).replaceAll(\, ).toInt,
s(5).replaceAll(\, ),
s(6).replaceAll(\, ).toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll(\, ).toInt,
s(10).replaceAll(\, ).toInt,
s(11).replaceAll(\, ).toFloat,
s(12).replaceAll(\, ).toInt,
s(13).replaceAll(\, ).toInt,
s(14).replaceAll(\, )
)
).toDF()
bank.registerTempTable(summary)


//Output
import java.text.SimpleDateFormat import java.util.Calendar import
java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
rowStructText: org.apache.spark.rdd.RDD[String] =
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
console:61 java.lang.UnsupportedOperationException: Schema for type
java.util.Date is not supported at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


Any suggestions

On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak





-- 
Deepak


Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread ๏̯͡๏
how do i persist the RDD to HDFS ?

On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 This message means that java.util.Date is not supported by Spark
 DataFrame. You'll need to use java.sql.Date, I believe.

 On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 That seem to be working. however i see a new exception

 Code:
 def formatStringAsDate(dateStr: String) = new
 SimpleDateFormat(-MM-dd).parse(dateStr)


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
 String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11:
 Float, f12: Integer, f13: Integer, f14: String)

 val summary  = rowStructText.map(s = s.split(,)).map(
 s = Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 ).toDF()
 bank.registerTempTable(summary)


 //Output
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
 MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
 org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
 console:61 java.lang.UnsupportedOperationException: Schema for type
 java.util.Date is not supported at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


 Any suggestions

 On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you
 want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak





 --
 Deepak





-- 
Deepak


Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread ๏̯͡๏
Code:

val summary  = rowStructText.map(s = s.split(,)).map(
{
s =
Summary(formatStringAsDate(s(0)),
s(1).replaceAll(\, ).toLong,
s(3).replaceAll(\, ).toLong,
s(4).replaceAll(\, ).toInt,
s(5).replaceAll(\, ),
s(6).replaceAll(\, ).toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll(\, ).toInt,
s(10).replaceAll(\, ).toInt,
s(11).replaceAll(\, ).toFloat,
s(12).replaceAll(\, ).toInt,
s(13).replaceAll(\, ).toInt,
s(14).replaceAll(\, )
)
}
)
summary.saveAsTextFile(sparkO)

Exception:
import java.text.SimpleDateFormat import java.util.Calendar import
java.sql.Date import org.apache.spark.storage.StorageLevel
formatStringAsDate: (dateStr: String)java.sql.Date rowStructText:
org.apache.spark.rdd.RDD[String] =
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
MapPartitionsRDD[263] at textFile at console:154 defined class Summary
summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[265] at map
at console:159 sumDF: org.apache.spark.sql.DataFrame = [f1: date, f2:
bigint, f3: bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9:
int, f10: int, f11: float, f12: int, f13: int, f14: string]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage
45.0 (TID 1872, datanode-6-3486.phx01.dev.ebayc3.com):
java.lang.ArrayIndexOutOfBoundsException: 1 at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:163)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:161)
at scala.collection.Iterator$$anon

On Wed, Aug 5, 2015 at 9:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 how do i persist the RDD to HDFS ?

 On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 This message means that java.util.Date is not supported by Spark
 DataFrame. You'll need to use java.sql.Date, I believe.

 On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 That seem to be working. however i see a new exception

 Code:
 def formatStringAsDate(dateStr: String) = new
 SimpleDateFormat(-MM-dd).parse(dateStr)


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
 String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11:
 Float, f12: Integer, f13: Integer, f14: String)

 val summary  = rowStructText.map(s = s.split(,)).map(
 s = Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 ).toDF()
 bank.registerTempTable(summary)


 //Output
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
 MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
 org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
 console:61 java.lang.UnsupportedOperationException: Schema for type
 java.util.Date is not supported at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


 Any suggestions

 On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you
 want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have csv data that is embedded in gzip format on HDFS.


Newbie question: can shuffle avoid writing and reading from disk?

2015-08-05 Thread Muler
Hi,

Consider I'm running WordCount with 100m of data on 4 node cluster.
Assuming my RAM size on each node is 200g and i'm giving my executors 100g
(just enough memory for 100m data)


   1. If I have enough memory, can Spark 100% avoid writing to disk?
   2. During shuffle, where results have to be collected from nodes, does
   each node write to disk and then the results are pulled from disk? If not,
   what is the API that is being used to pull data from nodes across the
   cluster? (I'm thinking what Scala or Java packages would allow you to read
   in-memory data from other machines?)

Thanks,


Re: Newbie question: can shuffle avoid writing and reading from disk?

2015-08-05 Thread Saisai Shao
Hi Muler,

Shuffle data will be written to disk, no matter how large memory you have,
large memory could alleviate shuffle spill where temporary file will be
generated if memory is not enough.

Yes, each node writes shuffle data to file and pulled from disk in reduce
stage from network framework (default is Netty).

Thanks
Saisai

On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote:

 Hi,

 Consider I'm running WordCount with 100m of data on 4 node cluster.
 Assuming my RAM size on each node is 200g and i'm giving my executors 100g
 (just enough memory for 100m data)


1. If I have enough memory, can Spark 100% avoid writing to disk?
2. During shuffle, where results have to be collected from nodes, does
each node write to disk and then the results are pulled from disk? If not,
what is the API that is being used to pull data from nodes across the
cluster? (I'm thinking what Scala or Java packages would allow you to read
in-memory data from other machines?)

 Thanks,



Pause Spark Streaming reading or sampling streaming data

2015-08-05 Thread foobar
Hi, I have a question about sampling Spark Streaming data, or getting part of
the data. For every minute, I only want the data read in during the first 10
seconds, and discard all data in the next 50 seconds. Is there any way to
pause reading and discard data in that period? I'm doing this to sample from
a stream of huge amount of data, which saves processing time in the
real-time program. Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pause-Spark-Streaming-reading-or-sampling-streaming-data-tp24146.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Pause Spark Streaming reading or sampling streaming data

2015-08-05 Thread Heath Guo
Hi, I have a question about sampling Spark Streaming data, or getting part of 
the data. For every minute, I only want the data read in during the first 10 
seconds, and discard all data in the next 50 seconds. Is there any way to pause 
reading and discard data in that period? I'm doing this to sample from a stream 
of huge amount of data, which saves processing time in the real-time program. 
Thanks!


Re: Newbie question: can shuffle avoid writing and reading from disk?

2015-08-05 Thread Muler
thanks, so if I have enough large memory (with enough spark.shuffle.memory)
then shuffle (in-memory shuffle) spill doesn't happen (per node) but still
shuffle data has to be ultimately written to disk so that reduce stage
pulls if across network?

On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Hi Muler,

 Shuffle data will be written to disk, no matter how large memory you have,
 large memory could alleviate shuffle spill where temporary file will be
 generated if memory is not enough.

 Yes, each node writes shuffle data to file and pulled from disk in reduce
 stage from network framework (default is Netty).

 Thanks
 Saisai

 On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote:

 Hi,

 Consider I'm running WordCount with 100m of data on 4 node cluster.
 Assuming my RAM size on each node is 200g and i'm giving my executors 100g
 (just enough memory for 100m data)


1. If I have enough memory, can Spark 100% avoid writing to disk?
2. During shuffle, where results have to be collected from nodes,
does each node write to disk and then the results are pulled from disk? If
not, what is the API that is being used to pull data from nodes across the
cluster? (I'm thinking what Scala or Java packages would allow you to read
in-memory data from other machines?)

 Thanks,





Windows function examples in pyspark

2015-08-05 Thread jegordon
Hi to all,

Im trying to use some windows functions (ntile and percentRank) for a
Dataframe but i dont know how to use them. 

Does anyone can help me with this please? in the Python API documentation
there are no examples about it.

In specific, im trying to get quantiles of a numeric field in my dataframe.

I'm using spark 1.4.0

Thanks a lot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Windows-function-examples-in-pyspark-tp24144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Starting Spark SQL thrift server from within a streaming app

2015-08-05 Thread Todd Nist
Hi Danniel,

It is possible to create an instance of the SparkSQL Thrift server, however
seems like this project is what you may be looking for:

https://github.com/Intel-bigdata/spark-streamingsql

Not 100% sure of your use case is, but you can always convert the data into
DF then issue a query against it.  If you want other systems to be able to
query it then there are numerous connectors to  store data into Hive,
Cassandra, HBase, ElasticSearch, 

To create a instance of a thrift server with its own SQL Context you would
do something like the following:

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveMetastoreTypes._
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.thriftserver._


object MyThriftServer {

  val sparkConf = new SparkConf()
// master is passed to spark-submit, but could also be specified explicitely
// .setMaster(sparkMaster)
.setAppName(My ThriftServer)
.set(spark.cores.max, 2)
  val sc = new SparkContext(sparkConf)
  val  sparkContext  =  sc
  import  sparkContext._
  val  sqlContext  =  new  HiveContext(sparkContext)
  import  sqlContext._
  import sqlContext.implicits._

  makeRDD((1,hello) :: (2,world) ::Nil).toDF.cache().registerTempTable(t)

  HiveThriftServer2.startWithContext(sqlContext)
}

Again, I'm not really clear what your use case is, but it does sound like
the first link above is what you may want.

-Todd

On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv 
daniel.ha...@veracity-group.com wrote:

 Hi,
 Is it possible to start the Spark SQL thrift server from with a streaming
 app so the streamed data could be queried as it's goes in ?

 Thank you.
 Daniel



Re: trying to understand yarn-client mode

2015-08-05 Thread nir
Hi DB Tsai-2,

I am trying to run singleton sparkcontext in my container (spring-boot
tomcat container). When my application bootstrap I used to create
sparkContext and keep the reference for future job submission. I got it
working with standalone spark perfectly but I am having trouble with yarn
modes specially yarn-cluster mode. 
What is new Client(new ClientArguments(args, sparkConf), hadoopConfig,
sparkConf).run  API? 
How do submit subsequent request to spark after this?

I use Java API but I can use scala too.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/trying-to-understand-yarn-client-mode-tp7925p24145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie question: can shuffle avoid writing and reading from disk?

2015-08-05 Thread Saisai Shao
Yes, finally shuffle data will be written to disk for reduce stage to pull,
no matter how large you set to shuffle memory fraction.

Thanks
Saisai

On Thu, Aug 6, 2015 at 7:50 AM, Muler mulugeta.abe...@gmail.com wrote:

 thanks, so if I have enough large memory (with enough
 spark.shuffle.memory) then shuffle (in-memory shuffle) spill doesn't happen
 (per node) but still shuffle data has to be ultimately written to disk so
 that reduce stage pulls if across network?

 On Wed, Aug 5, 2015 at 4:40 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Muler,

 Shuffle data will be written to disk, no matter how large memory you
 have, large memory could alleviate shuffle spill where temporary file will
 be generated if memory is not enough.

 Yes, each node writes shuffle data to file and pulled from disk in reduce
 stage from network framework (default is Netty).

 Thanks
 Saisai

 On Thu, Aug 6, 2015 at 7:10 AM, Muler mulugeta.abe...@gmail.com wrote:

 Hi,

 Consider I'm running WordCount with 100m of data on 4 node cluster.
 Assuming my RAM size on each node is 200g and i'm giving my executors 100g
 (just enough memory for 100m data)


1. If I have enough memory, can Spark 100% avoid writing to disk?
2. During shuffle, where results have to be collected from nodes,
does each node write to disk and then the results are pulled from disk? 
 If
not, what is the API that is being used to pull data from nodes across 
 the
cluster? (I'm thinking what Scala or Java packages would allow you to 
 read
in-memory data from other machines?)

 Thanks,






Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
I have a parquet directory that was produced by partitioning by two keys,
e.g. like this:

df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b for each
value of a, for a total of over 40,000 partitions.

Before running any transformations or actions on the DataFrame, just
initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping to discover all
the partitions? Is it perhaps having to merge the schema from each
partition? Would you expect it to get better or worse if I subpartition by
another key?

- Philip


Re: Pause Spark Streaming reading or sampling streaming data

2015-08-05 Thread Heath Guo
Hi Dimitris,

Thanks for your reply. Just wondering – are you asking about my streaming input 
source? I implemented a custom receiver and have been using that. Thanks.

From: Dimitris Kouzis - Loukas look...@gmail.commailto:look...@gmail.com
Date: Wednesday, August 5, 2015 at 5:27 PM
To: Heath Guo heath...@fb.commailto:heath...@fb.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Pause Spark Streaming reading or sampling streaming data

What driver do you use? Sounds like something you should do before the driver...

On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo 
heath...@fb.commailto:heath...@fb.com wrote:
Hi, I have a question about sampling Spark Streaming data, or getting part of 
the data. For every minute, I only want the data read in during the first 10 
seconds, and discard all data in the next 50 seconds. Is there any way to pause 
reading and discard data in that period? I'm doing this to sample from a stream 
of huge amount of data, which saves processing time in the real-time program. 
Thanks!



PySpark in Pycharm- unable to connect to remote server

2015-08-05 Thread Ashish Dutt
Use Case: I want to use my laptop (using Win 7 Professional) to connect to
the CentOS 6.4 master server using PyCharm.

Objective: To write the code in Pycharm on the laptop and then send the job
to the server which will do the processing and should then return the
result back to the laptop or to any other visualizing API.

The server and 3 namenodes already installed with pyspark and I have
checked pyspark works in standalone mode on all four servers. Pyspark works
in standalone mode on my laptop too.

I use the following code but I am not able to connect to the remote server.

import os
import sys
try:
from pyspark import SparkContext
from pyspark import SparkConf
print (Pyspark sucess)
except ImportError as e:
print (Error importing Spark Modules, e)

conf = SparkConf()
conf.setMaster(spark://10.210.250.400:7077)
conf.setAppName(First_Remote_Spark_Program)
sc = SparkContext(conf=conf)
print (connection succeeded with Master,conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData)

The stack trace of error is

Pyspark sucess
15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in
the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe
in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.clinit(Shell.java:326)
at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.init(Groups.java:77)
at 
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
at org.apache.spark.SparkContext.init(SparkContext.scala:301)
at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable
remote address 

Reliable Streaming Receiver

2015-08-05 Thread Sourabh Chandak
Hi,

I am trying to replicate the Kafka Streaming Receiver for a custom version
of Kafka and want to create a Reliable receiver. The current implementation
uses BlockGenerator which is a private class inside Spark streaming hence I
can't use that in my code. Can someone help me with some resources to
tackle this issue?



Thanks,
Sourabh


How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?

2015-08-05 Thread Ashish Dutt
*Use Case:* To automate the process of data extraction (HDFS), data
analysis (pySpark/sparkR) and saving the data back to HDFS
programmatically.

*Prospective solutions:*

1. Create a remote server connectivity program in an IDE like pyCharm or
RStudio and use it to retrieve the data from HDFS or else
2. Create the data retrieval code in python or R and then point the IDE to
the remote server using TCP.

*Problem:* How to achieve either of the prospective solution 1 or 2 defined
above? Do you have any better solution then these, if yes please share?

*What have I tried so far?*

The server and 3 namenodes already installed with pyspark and I have
checked pyspark works in standalone mode on all four servers. Pyspark works
in standalone mode on my laptop too.

I use the following code but I am not able to connect to the remote server.

import os
import sys
try:
from pyspark import SparkContext
from pyspark import SparkConf
print (Pyspark sucess)
except ImportError as e:
print (Error importing Spark Modules, e)

conf = SparkConf()
conf.setMaster(spark://10.210.250.400:7077)
conf.setAppName(First_Remote_Spark_Program)
sc = SparkContext(conf=conf)
print (connection succeeded with Master,conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData)

The stack trace of error is

Pyspark sucess
15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in
the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe
in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.clinit(Shell.java:326)
at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.init(Groups.java:77)
at 
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at 
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at 
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at 
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at 
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
at org.apache.spark.SparkContext.init(SparkContext.scala:301)
at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:09:06 WARN AppClient$ClientActor: