spark-xml data source (com.databricks.spark.xml) not working with spark 1.6

2016-01-28 Thread Deenar Toraskar
Hi

Anyone tried using spark-xml with spark 1.6. I cannot even get the sample
book.xml file (wget
https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
) working
https://github.com/databricks/spark-xml

scala> val df =
sqlContext.read.format("com.databricks.spark.xml").load("books.xml")


scala> df.count

res4: Long = 0


Anyone else facing the same issue?


Deenar


Re: Having issue with Spark SQL JDBC on hive table !!!

2016-01-28 Thread @Sanjiv Singh
Any help on this.

Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Wed, Jan 27, 2016 at 10:25 PM, @Sanjiv Singh 
wrote:

> Hi Ted ,
> Its typo.
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Wed, Jan 27, 2016 at 9:13 PM, Ted Yu  wrote:
>
>> In the last snippet, temptable is shown by 'show tables' command.
>> Yet you queried tampTable.
>>
>> I believe this just was typo :-)
>>
>> On Wed, Jan 27, 2016 at 7:07 AM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have configured Spark to query on hive table.
>>>
>>> Run the Thrift JDBC/ODBC server using below command :
>>>
>>> *cd $SPARK_HOME*
>>> *./sbin/start-thriftserver.sh --master spark://myhost:7077 --hiveconf
>>> hive.server2.thrift.bind.host=myhost --hiveconf
>>> hive.server2.thrift.port=*
>>>
>>> and also able to connect through beeline
>>>
>>> *beeline>* !connect jdbc:hive2://192.168.145.20:
>>> Enter username for jdbc:hive2://192.168.145.20:: root
>>> Enter password for jdbc:hive2://192.168.145.20:: impetus
>>> *beeline > *
>>>
>>> It is not giving query result on hive table through Spark JDBC, but it
>>> is working with spark HiveSQLContext. See complete scenario explain below.
>>>
>>> Help me understand the issue why Spark SQL JDBC is not giving result ?
>>>
>>> Below are version details.
>>>
>>> *Hive Version  : 1.2.1*
>>> *Hadoop Version :  2.6.0*
>>> *Spark version:  1.3.1*
>>>
>>> Let me know if need other details.
>>>
>>>
>>> *Created Hive Table , insert some records and query it :*
>>>
>>> *beeline> !connect jdbc:hive2://myhost:1*
>>> Enter username for jdbc:hive2://myhost:1: root
>>> Enter password for jdbc:hive2://myhost:1: **
>>> *beeline> create table tampTable(id int ,name string ) clustered by (id)
>>> into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');*
>>> *beeline> insert into table tampTable values
>>> (1,'row1'),(2,'row2'),(3,'row3');*
>>> *beeline> select name from tampTable;*
>>> name
>>> -
>>> row1
>>> row3
>>> row2
>>>
>>> *Query through SparkSQL HiveSQLContext :*
>>>
>>> SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
>>> SparkContext sc = new SparkContext(sparkConf);
>>> HiveContext hiveContext = new HiveContext(sc);
>>> DataFrame teenagers = hiveContext.sql("*SELECT name FROM tampTable*");
>>> List teenagerNames = teenagers.toJavaRDD().map(new Function>> String>() {
>>>  @Override
>>>  public String call(Row row) {
>>>  return "Name: " + row.getString(0);
>>>  }
>>> }).collect();
>>> for (String name: teenagerNames) {
>>>  System.out.println(name);
>>> }
>>> teenagers2.toJavaRDD().saveAsTextFile("/tmp1");
>>> sc.stop();
>>>
>>> which is working perfectly and giving all names from table *tempTable*
>>>
>>> *Query through Spark SQL JDBC :*
>>>
>>> *beeline> !connect jdbc:hive2://myhost:*
>>> Enter username for jdbc:hive2://myhost:: root
>>> Enter password for jdbc:hive2://myhost:: **
>>> *beeline> show tables;*
>>> *temptable*
>>> *..other tables*
>>> beeline> *SELECT name FROM tampTable;*
>>>
>>> I can list the table through "show tables", but I run the query , it is
>>> either hanged or returns nothing.
>>>
>>>
>>>
>>> Regards
>>> Sanjiv Singh
>>> Mob :  +091 9990-447-339
>>>
>>
>>
>


Re: Parquet block size from spark-sql cli

2016-01-28 Thread Ted Yu
Have you tried the following (sc is SparkContext)?

sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)

On Thu, Jan 28, 2016 at 9:16 AM, ubet  wrote:

> Can I set the Parquet block size (parquet.block.size) in spark-sql. We are
> loading about 80 table partitions in parallel on 1.5.2 and run OOM.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-block-size-from-spark-sql-cli-tp26097.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark integration with HCatalog (specifically regarding partitions)

2016-01-28 Thread Elliot West
Is this perhaps not currently supported?

// TODO: Support persisting partitioned data source relations in Hive
compatible format

From:
https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L328

On 25 January 2016 at 19:45, Elliot West  wrote:

> Thanks for your response Jorge and apologies for my delay in replying. I
> took your advice with case 5 and declared the column names explicitly
> instead of the wildcard. This did the trick and I can now add partitions to
> an existing table. I also tried removing the 'partitionBy("id")' call as
> suggested, by this gave me a NoSuchMethodError. I suspect this would work
> if I were running a newer version of the Hive metastore. Oddly I'm still
> unable to create a new partitioned table, although I have since found a
> somewhat confusing warning while running case 4:
>
> Persisting partitioned data source relation `raboof` into Hive metastore
> in Spark SQL specific format, which is NOT compatible with Hive. Input
> path(s):
>
>
> If you have any thoughts, please let me know.
>
> Thanks - Elliot.
>
>
> On 13 January 2016 at 20:56, Jorge Machado  wrote:
>
>> Hi Elliot,
>>
>> I´m no Expert to but for the case 5 can it be that you changed the order
>> on the second insert ?
>> And why do you give the command partitionBy again if the table was
>> already create with partition ?
>>
>>
>>
>> insert into table foobar PARTITION (id)
>>   > values ("xxx", 1), ("yyy", 2);
>>
>>
>>
>> hive (default)> insert into table new_record_source
>>   > values (3, "zzz");
>>
>>
>>
>> Regards
>>
>>
>> On 11/01/2016, at 13:36, Elliot West  wrote:
>>
>> Hello,
>>
>> I am in the process of evaluating Spark (1.5.2) for a wide range of use
>> cases. In particular I'm keen to understand the depth of the integration
>> with HCatalog (aka the Hive Metastore). I am very encouraged when browsing
>> the source contained within the org.apache.spark.sql.hive package. My goals
>> are to evaluate how effectively Spark handles the following scenarios:
>>
>>1. Reading from an unpartitioned HCatalog table.
>>2. Reading from a partitioned HCatalog table with partition pruning
>>from filter pushdown.
>>3. Writing to a new unpartitioned HCatalog table.
>>4. Writing to a new partitioned HCatalog table.
>>5. Adding a partition to a partitioned HCatalog table.
>>
>> I found that the first three cases appear to function beautifully.
>> However, I cannot seem to effectively create new HCatalog aware partitions
>> either in a new table or on and existing table (cases 4 & 5). I suspect
>> this may be due to my inexperience with Spark so wonder if you could advise
>> me on what to try next. Here's what I have:
>>
>> *Case 4: Writing to a new partitioned HCatalog table*
>>
>> Create a source in Hive (could be plain data file also):
>>
>>
>> hive (default)> create table foobar ( id int, name string );
>> hive (default)> insert into table foobar values (1, "xxx"), (2, "zzz");
>>
>> Read the source with Spark, partition the data, and write to a new table:
>>
>> sqlContext.sql("select *
>> from foobar").write.format("orc").partitionBy("id").saveAsTable("raboof")
>>
>>
>> Check for the new table in Hive, it is partitioned correctly although the
>> formats and schema are unexpected:
>>
>> hive (default)> show table extended like 'raboof';
>> OK
>> tab_name
>> tableName: raboof
>> location:hdfs://host:port/user/hive/warehouse/raboof
>> inputformat:org.apache.hadoop.mapred.SequenceFileInputFormat
>> outputformat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>> columns:struct columns { list col}
>> partitioned:true
>> partitionColumns:struct partition_columns { i32 id}
>>
>>
>> Check for correctly partitioned data on HDFS, it appears to be there:
>>
>> [me@host]$ hdfs dfs -ls -R /user/hive/warehouse/raboof
>> /user/hive/warehouse/raboof/_SUCCESS
>> /user/hive/warehouse/raboof/id=1
>> /user/hive/warehouse/raboof/id=1/part-r-0-.orc
>> /user/hive/warehouse/raboof/id=2
>> /user/hive/warehouse/raboof/id=2/part-r-0-.orc
>>
>>
>> Something is wrong however, no data is returned from this query and the
>> column names appear incorrect:
>>
>> hive (default)> select * from default.raboof;
>> OK
>> col id
>>
>> HCatalog reports no partitions for the table:
>>
>> hive (default)> show partitions default.raboof;
>> OK
>> partition
>>
>> *Case 5: Adding a partition to a partitioned HCatalog table*
>>
>> Created partitioned source table in Hive:
>>
>> hive (default)> create table foobar ( name string )
>>   > partitioned by ( id int )
>>   > stored as orc;
>> hive (default)> insert into table foobar PARTITION (id)
>>   > values ("xxx", 1), ("yyy", 2);
>>
>>
>> Created a source for a new record to add to new_record_source:
>>
>> hive (default)> create table new_record_source ( id int, name string )
>>   > 

Re: Stream S3 server to Cassandra

2016-01-28 Thread Alexandr Dzhagriev
Hello Sateesh,

I think you can use a file stream, e.g.

streamingContext.fileStream[KeyClass, ValueClass,
InputFormatClass](dataDirectory)

to create a stream and then process the RDDs as you are doing now.

Thanks, Alex.


On Thu, Jan 28, 2016 at 10:56 AM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Hello Anyone... please help me to how to Stream the XML files from S3
> server to cassandra db using Spark Streaming java. presently iam using
> Spark core to do that job..but problem is i have to to run for every 15
> mints.. thats why iam looking for Spark Streaming.
>
>


Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
I am trying to understand what will happen when Spark has an exception
during processing, especially while streaming.

If I have a small code spinet like this:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  throw new Exception("User exception...")
}

If I run this I will get output like this:

[info] processed => [List(Item1)]
[error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job
streaming job 1453999278000 ms.0
[error] java.lang.Exception: User exception...
...
[info] processed => [List(Item2)]
[error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job
streaming job 1453999279000 ms.0
[error] java.lang.Exception: User exception...

First "Item1" is processed, and it fails (of course). In the next batch
"Item2" is processed. The record "Item1" has now been lost.

If I change my code so that the exception occurs inside a task:

myDStream.foreachRDD { (rdd: RDD[String]) =>
  println(s"processed => [${rdd.collect().toList}]")
  rdd.map{case x => throw new Exception("User exception...") }.collect()
}

Then the map closure will be retried, but once it has failed enough times
the record is discarded and processing continues to the next record.

Is it possible to ensure that records are not discarded, even if this means
stopping the application? I have the WAL enabled.


Parquet block size from spark-sql cli

2016-01-28 Thread ubet
Can I set the Parquet block size (parquet.block.size) in spark-sql. We are
loading about 80 table partitions in parallel on 1.5.2 and run OOM.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-block-size-from-spark-sql-cli-tp26097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Having issue with Spark SQL JDBC on hive table !!!

2016-01-28 Thread @Sanjiv Singh
Adding to it

job status at UI :

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write
1 select ename from employeetest(kill
)collect
at SparkPlan.scala:84
+details

2016/01/29 04:20:06 3.0 min
0/2

Getting below exception on Spark UI :

org.apache.spark.rdd.RDD.collect(RDD.scala:813)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:178)
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:233)
org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:344)
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:55)
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)


Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Thu, Jan 28, 2016 at 9:57 PM, @Sanjiv Singh 
wrote:

> Any help on this.
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Wed, Jan 27, 2016 at 10:25 PM, @Sanjiv Singh 
> wrote:
>
>> Hi Ted ,
>> Its typo.
>>
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>> On Wed, Jan 27, 2016 at 9:13 PM, Ted Yu  wrote:
>>
>>> In the last snippet, temptable is shown by 'show tables' command.
>>> Yet you queried tampTable.
>>>
>>> I believe this just was typo :-)
>>>
>>> On Wed, Jan 27, 2016 at 7:07 AM, @Sanjiv Singh 
>>> wrote:
>>>
 Hi All,

 I have configured Spark to query on hive table.

 Run the Thrift JDBC/ODBC server using below command :

 *cd $SPARK_HOME*
 *./sbin/start-thriftserver.sh --master spark://myhost:7077 --hiveconf
 hive.server2.thrift.bind.host=myhost --hiveconf
 hive.server2.thrift.port=*

 and also able to connect through beeline

 *beeline>* !connect jdbc:hive2://192.168.145.20:
 Enter username for jdbc:hive2://192.168.145.20:: root
 Enter password for jdbc:hive2://192.168.145.20:: impetus
 *beeline > *

 It is not giving query result on hive table through Spark JDBC, but it
 is working with spark HiveSQLContext. See complete scenario explain below.

 Help me understand the issue why Spark SQL JDBC is not giving result ?

 Below are version details.

 *Hive Version  : 1.2.1*
 *Hadoop Version :  2.6.0*
 *Spark version:  1.3.1*

 Let me know if need other details.


 *Created Hive Table , insert some records and query it :*

 *beeline> !connect jdbc:hive2://myhost:1*
 Enter username for jdbc:hive2://myhost:1: root
 Enter password for jdbc:hive2://myhost:1: **
 *beeline> create table tampTable(id int ,name string ) clustered by
 (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');*
 *beeline> insert into table tampTable values
 (1,'row1'),(2,'row2'),(3,'row3');*
 *beeline> select name from tampTable;*
 name
 -
 row1
 row3
 row2

 *Query through SparkSQL HiveSQLContext :*

 SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
 SparkContext sc = new SparkContext(sparkConf);
 HiveContext hiveContext = new HiveContext(sc);
 DataFrame teenagers = hiveContext.sql("*SELECT name FROM tampTable*");
 List teenagerNames = teenagers.toJavaRDD().map(new
 Function() {
  @Override
  public String call(Row row) {
  return "Name: " + row.getString(0);
  }
 }).collect();
 for (String name: teenagerNames) {
  System.out.println(name);
 }
 teenagers2.toJavaRDD().saveAsTextFile("/tmp1");
 sc.stop();

 which is working perfectly and giving all names from table *tempTable*

 *Query through Spark SQL JDBC :*

 *beeline> !connect jdbc:hive2://myhost:*
 Enter username for jdbc:hive2://myhost:: root
 Enter password for 

Re: Spark Distribution of Small Dataset

2016-01-28 Thread Kevin Mellott
Hi Phil,

The short answer is that there is a driver machine (which handles the
distribution of tasks and data) and a number of worker nodes (which receive
data and perform the actual tasks). That being said, certain tasks need to
be performed on the driver, because they require all of the data.

I'd recommend taking a look at the video below, which will explain this
concept in much greater detail. It also goes through an example and shows
you how to use the logging tools to understand what is happening within
your program.

https://www.youtube.com/watch?v=dmL0N3qfSc8

Thanks,
Kevin

On Thu, Jan 28, 2016 at 4:41 AM, Philip Lee  wrote:

> Hi,
>
> Simple Question about Spark Distribution of Small Dataset.
>
> Let's say I have 8 machine with 48 cores and 48GB of RAM as a cluster.
> Dataset  (format is ORC by Hive) is so small like 1GB, but I copied it to
> HDFS.
>
> 1) if spark-sql run the dataset distributed on HDFS in each machine, what
> happens to the job? I meant one machine handles the dataset because it is
> so small?
>
> 2) but the thing is dataset is already distributed in each machine.
> or each machine handles the distributed dataset and send it to the Master
> Node?
>
> Could you explain about this in detail in a distributed way?
>
> Best,
> Phil
>
>
>
>


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-28 Thread Jonathan Kelly
Just FYI, Spark 1.6 was released on emr-4.3.0 a couple days ago:
https://aws.amazon.com/blogs/aws/emr-4-3-0-new-updated-applications-command-line-export/
On Thu, Jan 28, 2016 at 7:30 PM Andrew Zurn  wrote:

> Hey Daniel,
>
> Thanks for the response.
>
> After playing around for a bit, it looks like it's probably the something
> similar to the first situation you mentioned, with the Parquet format
> causing issues. Both programmatically created dataset and a dataset pulled
> off the internet (rather than out of S3 and put into HDFS/Hive) acted with
> DataFrames as one would expect (printed out everything, grouped properly,
> etc.)
>
> It looks like there is more than likely an outstanding bug that causes
> issues with data coming from S3 and is converted in the parquet format
> (found an article here highlighting it was around in 1.4, and I guess it
> wouldn't be out of the realm of things for it still to exist. Link to
> article:
> https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/
>
> Hopefully a little more stability will come out with the upcoming Spark
> 1.6 release on EMR (I think that is happening sometime soon).
>
> Thanks again for the advice on where to dig further into. Much appreciated.
>
> Andrew
>
> On Tue, Jan 26, 2016 at 9:18 AM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Have you tried setting spark.emr.dropCharacters to a lower value? (It
>> defaults to 8.)
>>
>> :) Just joking, sorry! Fantastic bug.
>>
>> What data source do you have for this DataFrame? I could imagine for
>> example that it's a Parquet file and on EMR you are running with two wrong
>> version of the Parquet library and it messes up strings. It should be easy
>> enough to try a different data format. You could also try what happens if
>> you just create the DataFrame programmatically, e.g.
>> sc.parallelize(Seq("asdfasdfasdf")).toDF.
>>
>> To understand better at which point the characters are lost you could try
>> grouping by a string attribute. I see "education" ends up either as ""
>> (empty string) or "y" in the printed output. But are the characters already
>> lost when you try grouping by the attribute? Will there be a single ""
>> category, or will you have separate categories for "primary" and "tertiary"?
>>
>> I think the correct output through the RDD suggests that the issue
>> happens at the very end. So it will probably happen also with different
>> data sources, and grouping will create separate groups for "primary" and
>> "tertiary" even though they are printed as the same string at the end. You
>> should also check the data from "take(10)" to rule out any issues with
>> printing. You could try the same "groupBy" trick after "take(10)". Or you
>> could print the lengths of the strings.
>>
>> Good luck!
>>
>> On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:
>>
>>> Sorry for the bump, but wondering if anyone else has seen this before.
>>> We're
>>> hoping to either resolve this soon, or move on with further steps to move
>>> this into an issue.
>>>
>>> Thanks in advance,
>>>
>>> Andrew Zurn
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Repartition taking place for all previous windows even after checkpointing

2016-01-28 Thread Abhishek Anand
Hi All,

Can someone help me with the following doubts regarding checkpointing :

My code flow is something like follows ->

1) create direct stream from kafka
2) repartition kafka stream
3)  mapToPair followed by reduceByKey
4)  filter
5)  reduceByKeyAndWindow without the inverse function
6)  write to cassandra

Now when I restart my application from checkpoint, I see repartition and
other steps being called for the previous windows which takes longer and
delays my aggregations.

My understanding  was that once data checkpointing is done it should not
re-read from kafka and use the saved RDDs but guess I am wrong.

Is there a way to avoid the repartition or any workaround for this.

Spark Version is 1.4.0

Cheers !!
Abhi


Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Eran Witkon
Can you name the features that make databricks better than zepplin?
Eran
On Fri, 29 Jan 2016 at 01:37 Michal Klos  wrote:

> We use both databricks and emr. We use databricks for our exploratory /
> adhoc use cases because their notebook is pretty badass and better than
> Zeppelin IMHO.
>
> We use EMR for our production machine learning and ETL tasks. The nice
> thing about EMR is you can use applications other than spark. From a "tools
> in the toolbox" perspective this is very important.
>
> M
>
> On Jan 28, 2016, at 6:05 PM, Sourav Mazumder 
> wrote:
>
> You can also try out IBM's spark as a service in IBM Bluemix. You'll get
> there all required features for security, multitenancy, notebook,
> integration with other big data services. You can try that out for free too.
>
> Regards,
> Sourav
>
> On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni 
> wrote:
>
>> At its core, EMR just launches Spark applications, whereas Databricks is
>>> a higher-level platform that also includes multi-user support, an
>>> interactive UI, security, and job scheduling.
>>>
>>> Specifically, Databricks runs standard Spark applications inside a
>>> user’s AWS account, similar to EMR, but it adds a variety of features to
>>> create an end-to-end environment for working with Spark. These include:
>>>
>>>
>>>-
>>>
>>>Interactive UI (includes a workspace with notebooks, dashboards, a
>>>job scheduler, point-and-click cluster management)
>>>-
>>>
>>>Cluster sharing (multiple users can connect to the same cluster,
>>>saving cost)
>>>-
>>>
>>>Security features (access controls to the whole workspace)
>>>-
>>>
>>>Collaboration (multi-user access to the same notebook, revision
>>>control, and IDE and GitHub integration)
>>>-
>>>
>>>Data management (support for connecting different data sources to
>>>Spark, caching service to speed up queries)
>>>
>>>
>>> The idea is that a lot of Spark deployments soon need to bring in
>>> multiple users, different types of jobs, etc, and we want to have these
>>> built-in. But if you just want to connect to existing data and run jobs,
>>> that also works.
>>>
>>> The cluster manager in Databricks is based on Standalone mode, not YARN,
>>> but Databricks adds several features, such as allowing multiple users to
>>> run commands on the same cluster and running multiple versions of Spark.
>>> Because Databricks is also the team that initially built Spark, the service
>>> is very up to date and integrated with the newest Spark features -- e.g.
>>> you can run previews of the next release, any data in Spark can be
>>> displayed visually, etc.
>>>
>>> *From: *Alex Nastetsky 
>>> *Subject: **Databricks Cloud vs AWS EMR*
>>> *Date: *January 26, 2016 at 11:55:41 AM PST
>>> *To: *user 
>>>
>>> As a user of AWS EMR (running Spark and MapReduce), I am interested in
>>> potential benefits that I may gain from Databricks Cloud. I was wondering
>>> if anyone has used both and done comparison / contrast between the two
>>> services.
>>>
>>> In general, which resource manager(s) does Databricks Cloud use for
>>> Spark? If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
>>>
>>> Thanks.
>>>
>>> --
>>
>>
>>
>


Re: local class incompatible: stream classdesc serialVersionUID

2016-01-28 Thread Ted Yu
I am not Scala expert.

RDD extends Serializable but doesn't have @SerialVersionUID() annotation.
This may explain what you described.

One approach is to add @SerialVersionUID so that RDD's have stable serial
version UID.

Cheers

On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad  wrote:

> I've searched through the mailing list archive. It seems that if you try
> to run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
> server, you will run into an exception like this:
>
> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage
> 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
> serialVersionUID = -3343649307726848892, local class serialVersionUID =
> -3996494161745401652
>
> If my application is using a library that builds against Spark 1.5.2, does
> that mean that my application is now tied to that same Spark standalone
> server version?
>
> Is there a recommended way for that library to have a Spark dependency but
> keep it compatible against a wider set of versions, i.e. any version 1.5.x?
>
> Thanks!
>


Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Rakesh Soni
>
> At its core, EMR just launches Spark applications, whereas Databricks is a
> higher-level platform that also includes multi-user support, an interactive
> UI, security, and job scheduling.
>
> Specifically, Databricks runs standard Spark applications inside a user’s
> AWS account, similar to EMR, but it adds a variety of features to create an
> end-to-end environment for working with Spark. These include:
>
>
>-
>
>Interactive UI (includes a workspace with notebooks, dashboards, a job
>scheduler, point-and-click cluster management)
>-
>
>Cluster sharing (multiple users can connect to the same cluster,
>saving cost)
>-
>
>Security features (access controls to the whole workspace)
>-
>
>Collaboration (multi-user access to the same notebook, revision
>control, and IDE and GitHub integration)
>-
>
>Data management (support for connecting different data sources to
>Spark, caching service to speed up queries)
>
>
> The idea is that a lot of Spark deployments soon need to bring in multiple
> users, different types of jobs, etc, and we want to have these built-in.
> But if you just want to connect to existing data and run jobs, that also
> works.
>
> The cluster manager in Databricks is based on Standalone mode, not YARN,
> but Databricks adds several features, such as allowing multiple users to
> run commands on the same cluster and running multiple versions of Spark.
> Because Databricks is also the team that initially built Spark, the service
> is very up to date and integrated with the newest Spark features -- e.g.
> you can run previews of the next release, any data in Spark can be
> displayed visually, etc.
>
> *From: *Alex Nastetsky 
> *Subject: **Databricks Cloud vs AWS EMR*
> *Date: *January 26, 2016 at 11:55:41 AM PST
> *To: *user 
>
> As a user of AWS EMR (running Spark and MapReduce), I am interested in
> potential benefits that I may gain from Databricks Cloud. I was wondering
> if anyone has used both and done comparison / contrast between the two
> services.
>
> In general, which resource manager(s) does Databricks Cloud use for Spark?
> If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
>
> Thanks.
>
> --


Re: How to write a custom window function?

2016-01-28 Thread Benyi Wang
Never mind.

GenericUDAFCollectList supports struct in 1.3.0. I modified it and it works
in a tricky way.

I also found an example HiveWindowFunction.

On Thu, Jan 28, 2016 at 12:49 PM, Benyi Wang  wrote:

> I'm trying to implement a WindowFunction like collect_list, but I have to
> collect a struct. collect_list works only for primitive type.
>
> I think I might modify GenericUDAFCollectList, but haven't tried it yet.
>
> I'm wondering if there is an example showing how to write a custom
> WindowFunction in Spark-sql
>
> Thanks.
>
>
>


Re: JSON to SQL

2016-01-28 Thread Andrés Ivaldi
Thans for the tip, I've realize about that end I've ended using explode as
you said.

This is my attempt

 var res=(df.explode("rows","r") {
l: WrappedArray[ArrayBuffer[String]] => l.toList}).select("r")
.map { m => m.getList[Row](0) }

 var u = res.map { m => Row.fromSeq(m.toSeq) }

var df1 = df.sqlContext.createDataFrame(u, getScheme(df)  )

It woks ok, but throws an invalid cast to Integer if the scheme have some
IntegerType, looks like a spark-csv bug, but I can solved anyway

Thanks for the help.


On Thu, Jan 28, 2016 at 7:43 PM, Mohammed Guller 
wrote:

> You don’t need Hive for that. The DataFrame class has a method  named
> explode, which provides the same functionality.
>
>
>
> Here is an example from the Spark API documentation:
>
> df.explode("words", "word"){words: String => words.split(" ")}
>
>
>
> The first argument to the explode method  is the name of the input column
> and the second argument is the name of the output column.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Andrés Ivaldi [mailto:iaiva...@gmail.com]
> *Sent:* Wednesday, January 27, 2016 7:17 PM
> *To:* Cheng, Hao
> *Cc:* Sahil Sareen; Al Pivonka; user
>
> *Subject:* Re: JSON to SQL
>
>
>
> I'm using DataFrames reading the JSON exactly as you say, and I can get
> the scheme from there. Reading the documentation, I realized that is
> possible to create Dynamically a Structure, so applying some
> transformations to the dataFrame plus the new structure I'll be able to
> save the JSON on my DBRM.
>
>
>
> For the flatten approach, you mentioned LateralView, do I need Hive DB for
> that? or just the Spark Hive Context? I saw some examples and that is
> exactly what I'm needing. Can you explain it a little bit more?
>
>
>
> Thanks
>
>
>
> On Wed, Jan 27, 2016 at 10:29 PM, Cheng, Hao  wrote:
>
> Have you ever try the DataFrame API like:
> sqlContext.read.json("/path/to/file.json"); the Spark SQL will auto infer
> the type/schema for you.
>
>
>
> And lateral view will help on the flatten issues,
>
> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView,
> as well as the “a.b[0].c” format of expression.
>
>
>
>
>
> *From:* Andrés Ivaldi [mailto:iaiva...@gmail.com]
> *Sent:* Thursday, January 28, 2016 3:39 AM
> *To:* Sahil Sareen
> *Cc:* Al Pivonka; user
> *Subject:* Re: JSON to SQL
>
>
>
> I'm really brand new with Scala, but if I'm defining a case class then is
> becouse I know how is the json's structure is previously?
>
> If I'm able to define dinamicaly a case class from the JSON structure then
> even with spark I will be able to extract the data
>
>
>
> On Wed, Jan 27, 2016 at 4:01 PM, Sahil Sareen  wrote:
>
> Isn't this just about defining a case class and using
> parse(json).extract[CaseClassName]  using Jackson?
>
> -Sahil
>
>
>
> On Wed, Jan 27, 2016 at 11:08 PM, Andrés Ivaldi 
> wrote:
>
> We dont have Domain Objects, its a service like a pipeline, data is read
> from source and they are saved it in relational Database
>
> I can read the structure from DataFrames, and do some transformations, I
> would prefer to do it with Spark to be consistent with the process
>
>
>
> On Wed, Jan 27, 2016 at 12:25 PM, Al Pivonka  wrote:
>
> Are you using an Relational Database?
>
> If so why not use a nojs DB ? then pull from it to your relational?
>
>
>
> Or utilize a library that understands Json structure like Jackson to
> obtain the data from the Json structure the persist the Domain Objects ?
>
>
>
> On Wed, Jan 27, 2016 at 9:45 AM, Andrés Ivaldi  wrote:
>
> Sure,
>
> The Job is like an etl, but without interface, so I decide the rules of
> how the JSON will be saved into a SQL Table.
>
>
>
> I need to Flatten the hierarchies where is possible in case of list
> flatten also, nested objects Won't be processed by now
>
> {"a":1,"b":[2,3],"c"="Field", "d":[4,5,6,7,8] }
> {"a":11,"b":[22,33],"c"="Field1", "d":[44,55,66,77,88] }
> {"a":111,"b":[222,333],"c"="Field2", "d":[44,55,666,777,888] }
>
> I would like something like this on my SQL table
>
> ab  c d
>
> 12,3Field 4,5,6,7,8
>
> 11   22,33  Field144,55,66,77,88
>
> 111  222,333Field2444,555,,666,777,888
>
> Right now this is what i need
>
> I will later add more intelligence, like detection of list or nested
> objects and create relations in other tables.
>
>
>
>
>
>
>
> On Wed, Jan 27, 2016 at 11:25 AM, Al Pivonka  wrote:
>
> More detail is needed.
>
> Can you provide some context to the use-case ?
>
>
>
> On Wed, Jan 27, 2016 at 8:33 AM, Andrés Ivaldi  wrote:
>
> Hello, I'm trying to Save a JSON filo into SQL table.
>
> If i try to do this directly the 

Re: Spark, Mesos, Docker and S3

2016-01-28 Thread Sathish Kumaran Vairavelu
Thank you., I figured it out. I have set executor memory to minimal and it
works.,

Another issue has come.. I have to pass --add-host option while running
containers in slave nodes.. Is there any option to pass docker run
parameters from spark?
On Thu, Jan 28, 2016 at 12:26 PM Mao Geng  wrote:

> Sathish,
>
> I guess the mesos resources are not enough to run your job. You might want
> to check the mesos log to figure out why.
>
> I tried to run the docker image with "--conf spark.mesos.coarse=false" and
> "true". Both are fine.
>
> Best,
> Mao
>
> On Wed, Jan 27, 2016 at 5:00 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Hi,
>>
>> On the same Spark/Mesos/Docker setup, I am getting warning "Initial Job
>> has not accepted any resources; check your cluster UI to ensure that
>> workers are registered and have sufficient resources". I am running in
>> coarse grained mode. Any pointers on how to fix this issue? Please help. I
>> have updated both docker.properties and spark-default.conf with  
>> spark.mesos.executor.docker.image
>> and other properties.
>>
>>
>> Thanks
>>
>> Sathish
>>
>> On Wed, Jan 27, 2016 at 9:58 AM Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Thanks a lot for your info! I will try this today.
>>> On Wed, Jan 27, 2016 at 9:29 AM Mao Geng  wrote:
>>>
 Hi Sathish,

 The docker image is normal, no AWS profile included.

 When the driver container runs with --net=host, the driver host's AWS
 profile will take effect so that the driver can access the protected s3
 files.

 Similarly,  Mesos slaves also run Spark executor docker container in
 --net=host mode, so that the AWS profile of Mesos slaves will take effect.

 Hope it helps,
 Mao

 On Jan 26, 2016, at 9:15 PM, Sathish Kumaran Vairavelu <
 vsathishkuma...@gmail.com> wrote:

 Hi Mao,

 I want to check on accessing the S3 from Spark docker in Mesos.  The
 EC2 instance that I am using has the AWS profile/IAM included.  Should we
 build the docker image with any AWS profile settings or --net=host docker
 option takes care of it?

 Please help


 Thanks

 Sathish

 On Tue, Jan 26, 2016 at 9:04 PM Mao Geng  wrote:

> Thank you very much, Jerry!
>
> I changed to "--jars
> /opt/spark/lib/hadoop-aws-2.7.1.jar,/opt/spark/lib/aws-java-sdk-1.7.4.jar"
> then it worked like a charm!
>
> From Mesos task logs below, I saw Mesos executor downloaded the jars
> from the driver, which is a bit unnecessary (as the docker image already
> has them), but that's ok - I am happy seeing Spark + Mesos + Docker + S3
> worked together!
>
> Thanks,
> Mao
>
> 16/01/27 02:54:45 INFO Executor: Using REPL class URI: 
> http://172.16.3.98:33771
> 16/01/27 02:55:12 INFO CoarseGrainedExecutorBackend: Got assigned task 0
> 16/01/27 02:55:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 16/01/27 02:55:12 INFO Executor: Fetching 
> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar with timestamp 
> 1453863280432
> 16/01/27 02:55:12 INFO Utils: Fetching 
> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar to 
> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp1518118694295619525.tmp
> 16/01/27 02:55:12 INFO Utils: Copying 
> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/-19880839621453863280432_cache
>  to /./hadoop-aws-2.7.1.jar
> 16/01/27 02:55:12 INFO Executor: Adding file:/./hadoop-aws-2.7.1.jar to 
> class loader
> 16/01/27 02:55:12 INFO Executor: Fetching 
> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar with timestamp 
> 1453863280472
> 16/01/27 02:55:12 INFO Utils: Fetching 
> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar to 
> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp8868621397726761921.tmp
> 16/01/27 02:55:12 INFO Utils: Copying 
> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/8167072821453863280472_cache
>  to /./aws-java-sdk-1.7.4.jar
> 16/01/27 02:55:12 INFO Executor: Adding file:/./aws-java-sdk-1.7.4.jar to 
> class loader
>
> On Tue, Jan 26, 2016 at 5:40 PM, Jerry Lam 
> wrote:
>
>> Hi Mao,
>>
>> Can you try --jars to include those jars?
>>
>> Best Regards,
>>
>> Jerry
>>
>> Sent from my iPhone
>>
>> On 26 Jan, 2016, at 7:02 pm, Mao Geng  wrote:
>>
>> Hi there,
>>
>> I am trying to run Spark on Mesos using a Docker image as executor,
>> as mentioned
>> http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-docker-support
>> .
>>
>> I built a docker image using the following Dockerfile (which is based
>> on
>> 

Re: spark-xml data source (com.databricks.spark.xml) not working with spark 1.6

2016-01-28 Thread Andrés Ivaldi
Hi, could you get it work, tomorrow I'll be using the xml parser also, On
windows 7, I'll let you know the results.

Regards,



On Thu, Jan 28, 2016 at 12:27 PM, Deenar Toraskar  wrote:

> Hi
>
> Anyone tried using spark-xml with spark 1.6. I cannot even get the sample
> book.xml file (wget
> https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
> ) working
> https://github.com/databricks/spark-xml
>
> scala> val df =
> sqlContext.read.format("com.databricks.spark.xml").load("books.xml")
>
>
> scala> df.count
>
> res4: Long = 0
>
>
> Anyone else facing the same issue?
>
>
> Deenar
>



-- 
Ing. Ivaldi Andres


RE: JSON to SQL

2016-01-28 Thread Mohammed Guller
You don’t need Hive for that. The DataFrame class has a method  named explode, 
which provides the same functionality.

Here is an example from the Spark API documentation:
df.explode("words", "word"){words: String => words.split(" ")}

The first argument to the explode method  is the name of the input column and 
the second argument is the name of the output column.

Mohammed
Author: Big Data Analytics with 
Spark

From: Andrés Ivaldi [mailto:iaiva...@gmail.com]
Sent: Wednesday, January 27, 2016 7:17 PM
To: Cheng, Hao
Cc: Sahil Sareen; Al Pivonka; user
Subject: Re: JSON to SQL

I'm using DataFrames reading the JSON exactly as you say, and I can get the 
scheme from there. Reading the documentation, I realized that is possible to 
create Dynamically a Structure, so applying some transformations to the 
dataFrame plus the new structure I'll be able to save the JSON on my DBRM.

For the flatten approach, you mentioned LateralView, do I need Hive DB for 
that? or just the Spark Hive Context? I saw some examples and that is exactly 
what I'm needing. Can you explain it a little bit more?

Thanks

On Wed, Jan 27, 2016 at 10:29 PM, Cheng, Hao 
> wrote:
Have you ever try the DataFrame API like: 
sqlContext.read.json("/path/to/file.json"); the Spark SQL will auto infer the 
type/schema for you.

And lateral view will help on the flatten issues,
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView, as 
well as the “a.b[0].c” format of expression.


From: Andrés Ivaldi [mailto:iaiva...@gmail.com]
Sent: Thursday, January 28, 2016 3:39 AM
To: Sahil Sareen
Cc: Al Pivonka; user
Subject: Re: JSON to SQL

I'm really brand new with Scala, but if I'm defining a case class then is 
becouse I know how is the json's structure is previously?

If I'm able to define dinamicaly a case class from the JSON structure then even 
with spark I will be able to extract the data

On Wed, Jan 27, 2016 at 4:01 PM, Sahil Sareen 
> wrote:
Isn't this just about defining a case class and using 
parse(json).extract[CaseClassName]  using Jackson?

-Sahil

On Wed, Jan 27, 2016 at 11:08 PM, Andrés Ivaldi 
> wrote:
We dont have Domain Objects, its a service like a pipeline, data is read  from 
source and they are saved it in relational Database

I can read the structure from DataFrames, and do some transformations, I would 
prefer to do it with Spark to be consistent with the process

On Wed, Jan 27, 2016 at 12:25 PM, Al Pivonka 
> wrote:
Are you using an Relational Database?
If so why not use a nojs DB ? then pull from it to your relational?

Or utilize a library that understands Json structure like Jackson to obtain the 
data from the Json structure the persist the Domain Objects ?

On Wed, Jan 27, 2016 at 9:45 AM, Andrés Ivaldi 
> wrote:
Sure,
The Job is like an etl, but without interface, so I decide the rules of how the 
JSON will be saved into a SQL Table.

I need to Flatten the hierarchies where is possible in case of list flatten 
also, nested objects Won't be processed by now

{"a":1,"b":[2,3],"c"="Field", "d":[4,5,6,7,8] }
{"a":11,"b":[22,33],"c"="Field1", "d":[44,55,66,77,88] }
{"a":111,"b":[222,333],"c"="Field2", "d":[44,55,666,777,888] }

I would like something like this on my SQL table
ab  c d
12,3Field 4,5,6,7,8
11   22,33  Field144,55,66,77,88
111  222,333Field2444,555,,666,777,888
Right now this is what i need
I will later add more intelligence, like detection of list or nested objects 
and create relations in other tables.



On Wed, Jan 27, 2016 at 11:25 AM, Al Pivonka 
> wrote:
More detail is needed.
Can you provide some context to the use-case ?

On Wed, Jan 27, 2016 at 8:33 AM, Andrés Ivaldi 
> wrote:
Hello, I'm trying to Save a JSON filo into SQL table.

If i try to do this directly the IlligalArgumentException is raised, I suppose 
this is beacouse JSON have a hierarchical structure, is that correct?

If that is the problem, how can I flatten the JSON structure? The JSON 
structure to be processed would be unknow, so I need to do it programatically

regards
--
Ing. Ivaldi Andres



--
Those who say it can't be done, are usually interrupted by those doing it.


--
Ing. Ivaldi Andres



--
Those who say it can't be done, are usually interrupted by those doing it.


--
Ing. Ivaldi Andres




--
Ing. Ivaldi Andres



--
Ing. Ivaldi Andres


Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Sourav Mazumder
You can also try out IBM's spark as a service in IBM Bluemix. You'll get
there all required features for security, multitenancy, notebook,
integration with other big data services. You can try that out for free too.

Regards,
Sourav

On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni  wrote:

> At its core, EMR just launches Spark applications, whereas Databricks is a
>> higher-level platform that also includes multi-user support, an interactive
>> UI, security, and job scheduling.
>>
>> Specifically, Databricks runs standard Spark applications inside a user’s
>> AWS account, similar to EMR, but it adds a variety of features to create an
>> end-to-end environment for working with Spark. These include:
>>
>>
>>-
>>
>>Interactive UI (includes a workspace with notebooks, dashboards, a
>>job scheduler, point-and-click cluster management)
>>-
>>
>>Cluster sharing (multiple users can connect to the same cluster,
>>saving cost)
>>-
>>
>>Security features (access controls to the whole workspace)
>>-
>>
>>Collaboration (multi-user access to the same notebook, revision
>>control, and IDE and GitHub integration)
>>-
>>
>>Data management (support for connecting different data sources to
>>Spark, caching service to speed up queries)
>>
>>
>> The idea is that a lot of Spark deployments soon need to bring in
>> multiple users, different types of jobs, etc, and we want to have these
>> built-in. But if you just want to connect to existing data and run jobs,
>> that also works.
>>
>> The cluster manager in Databricks is based on Standalone mode, not YARN,
>> but Databricks adds several features, such as allowing multiple users to
>> run commands on the same cluster and running multiple versions of Spark.
>> Because Databricks is also the team that initially built Spark, the service
>> is very up to date and integrated with the newest Spark features -- e.g.
>> you can run previews of the next release, any data in Spark can be
>> displayed visually, etc.
>>
>> *From: *Alex Nastetsky 
>> *Subject: **Databricks Cloud vs AWS EMR*
>> *Date: *January 26, 2016 at 11:55:41 AM PST
>> *To: *user 
>>
>> As a user of AWS EMR (running Spark and MapReduce), I am interested in
>> potential benefits that I may gain from Databricks Cloud. I was wondering
>> if anyone has used both and done comparison / contrast between the two
>> services.
>>
>> In general, which resource manager(s) does Databricks Cloud use for
>> Spark? If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
>>
>> Thanks.
>>
>> --
>
>
>


streaming in 1.6.0 slower than 1.5.1

2016-01-28 Thread Jesse F Chen


I ran the same streaming application (compiled individually for 1.5.1 and
1.6.0) that processes 5-second tweet batches.

I noticed two things:

1. 10% regression in 1.6.0 vs 1.5.1

 Spark v1.6.0: 1,564 tweets/s
 Spark v1.5.1: 1,747 tweets/s

2. 1.6.0 streaming seems to have a memory leak.

1.6.0, processing time gradually increases and eventually exceeds 5 seconds
so batches started to queue up.
While in 1.5.1, no such slow down.  See chart below to see the increasing
scheduling delay in 1.6:




I captured heap dumps in two version and did a comparison. I noticed the
Byte base class is using 50X more space in 1.5.1.

Here are some top classes in heap histogram and references.

Heap Histogram

All Classes (excluding platform)
1.6.0 Streaming 1.5.1 Streaming
Class   Instance Count  Total Size  Class   Instance Count
Total Size
class [B84533,227,649,599   class [B5095
62,938,466
class [C44682   4,255,502   class [C130482  
12,844,182
class java.lang.reflect.Method  90591,177,670   class
java.lang.String130171  1,562,052


References by Type  References by Type

class [B [0x640039e38]  class [B [0x6c020bb08]

Referrers by Type   Referrers by Type

Class   Count   Class   Count
java.nio.HeapByteBuffer 3239sun.security.util.DerInputBuffer
1233
sun.security.util.DerInputBuffer1233
sun.security.util.ObjectIdentifier  620
sun.security.util.ObjectIdentifier  620 [[B 397
[Ljava.lang.Object; 408 java.lang.reflect.Method
326




The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
The Java.nio.HeapByteBuffer referencing class did not show up in top in
1.5.1.

I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get
them here

https://ibm.box.com/sparkstreaming-jstack160
https://ibm.box.com/sparkstreaming-jstack151

Jesse







Re: Understanding Spark Task failures

2016-01-28 Thread Tathagata Das
That is hard to guarantee by the system, and it is upto the app developer
to ensure that this is not . For example, if the data in a message is
corrupted, unless the app code is robust towards handling such data, the
system will fail every time it retries that app code.

On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin 
wrote:

> I am trying to understand what will happen when Spark has an exception
> during processing, especially while streaming.
>
> If I have a small code spinet like this:
>
> myDStream.foreachRDD { (rdd: RDD[String]) =>
>   println(s"processed => [${rdd.collect().toList}]")
>   throw new Exception("User exception...")
> }
>
> If I run this I will get output like this:
>
> [info] processed => [List(Item1)]
> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job 
> streaming job 1453999278000 ms.0
> [error] java.lang.Exception: User exception...
> ...
> [info] processed => [List(Item2)]
> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job 
> streaming job 1453999279000 ms.0
> [error] java.lang.Exception: User exception...
>
> First "Item1" is processed, and it fails (of course). In the next batch
> "Item2" is processed. The record "Item1" has now been lost.
>
> If I change my code so that the exception occurs inside a task:
>
> myDStream.foreachRDD { (rdd: RDD[String]) =>
>   println(s"processed => [${rdd.collect().toList}]")
>   rdd.map{case x => throw new Exception("User exception...") }.collect()
> }
>
> Then the map closure will be retried, but once it has failed enough times
> the record is discarded and processing continues to the next record.
>
> Is it possible to ensure that records are not discarded, even if this
> means stopping the application? I have the WAL enabled.
>


Problems when applying scheme to RDD

2016-01-28 Thread Andrés Ivaldi
Hello, I'm having an exception when trying to apply a new Scheme to RDD

I'm reading an JSON with Databricks spark-csv v1.3.0


after applying some transformations I have RDD with Strings type columns

Then I'm trying to apply Scheme where one of the field is Integer then this
exception is riced

16/01/28 17:38:14 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 4,
localhost): java.lang.ClassCastException: java.lang.String cannot be cast
to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:41)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:221)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at
org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:354)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
at scala.collection.AbstractIterator.to(Iterator.scala:1194)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1194)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

The code I'm running is like

 var res=(df.explode("rows","r") {
l: WrappedArray[ArrayBuffer[String]] => l.toList}).select("r")
.map { m => m.getList[Row](0) }

  var u = res.map { m => Row.fromSeq(m.toSeq) }


  var df1 = df.sqlContext.createDataFrame(u, getScheme(df)  )
  //if df1.show -> the exception is riced


getScheme return the scheme, the las column is IntegerType, if I change it
to StringType
and then apply the cast like this, its works

  df1.select(df1("ga:pageviews").cast(IntegerType)).show

The order of the fields at the Structure seems to be ok.
I read that in early versions of spark-csv was a similar issue.

Any Ideas?

Regards!!!

Ing. Ivaldi Andres


Re: Broadcast join on multiple dataframes

2016-01-28 Thread Michael Armbrust
Can you provide the analyzed and optimized plans (explain(true))

On Thu, Jan 28, 2016 at 12:26 PM, Srikanth  wrote:

> Hello,
>
> I have a use case where one large table has to be joined with several
> smaller tables.
> I've added broadcast hint for all small tables in the joins.
>
> val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")
>
> val metaActionDF = sqlContext.read.format("json")
> val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
> val metaLocationDF =
> sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
>.join(broadcast(metaActionDF),
> "campaign_id")
>.join(broadcast(cidOrgDF),
> List("organization_id"), "left_outer")
>
> val metaCreativeDF = sqlContext.read.format("json")
> val metaExchangeDF = sqlContext.read.format("json")
> val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
> val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")
>
> val joinedBidderDF = largeTableDF.as("BID")
> .join(broadcast(metaLocationDF),
> "strategy_id")
> .join(broadcast(metaCreativeDF), "creative_id")
> .join(broadcast(metaExchangeDF),
> $"exchange_id" === $"id" , "left_outer")
> .join(broadcast(techKeyDF).as("TK"),
> $"BID.tech_id" === $"TK.tech_key" , "left_outer")
> .join(broadcast(localizationDF).as("BL"),
> $"BID.language" === $"BL.id" , "left_outer")
>
> When I look at the execution plan, all the joins are marked as
> broadcastjoin.
> But when I look at the spark job UI, the DAG visualization shows that some
> joins are sortmerged with shuffle involved.
> The ones that I've highlighted in yellow were shuffled.
> DAG can be viewed here -
> https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0
>
> Why is the actual execution as seen in the DAG different from the physical
> plan pasted below.
> I'm trying not to shuffle my largeTable. Any idea what is causing this?
>
> == Physical Plan ==
>
> BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None
>
> :- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None
>
> :  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
> LeftOuter, None
>
> :  :  :- Project [...]
>
> :  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
> [creative_id#131L], BuildRight
>
> :  :  : :- Project [...]
>
> :  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
> [strategy_id#127L], BuildRight
>
> :  :  : : :- ConvertToUnsafe
>
> :  :  : : :  +- Scan
> CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,
>
> :  :  : : +- Project [...]
>
> :  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
> [cast(organization_id#102 as bigint)], LeftOuter, None
>
> :  :  : :   :- Project [...]
>
> :  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
> [campaign_id#75L], BuildRight
>
> :  :  : :   : :- Project [...]
>
> :  :  : :   : :  +- Scan
> JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
> file:/shared/data/t1_meta/t1_meta_strategy.jsonl
>
> :  :  : :   : +- Scan JSONRelation[] InputPaths:
> file:/shared/data/t1_meta/t1_meta_campaign.jsonl
>
> :  :  : :   +- ConvertToUnsafe
>
> :  :  : :  +- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,
>
> :  :  : +- Scan
> JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
> InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl
>
> :  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
> file:/shared/data/t1_meta/t1_meta_exchange.jsonl
>
> :  +- ConvertToUnsafe
>
> : +- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,
>
>
> +- ConvertToUnsafe
>
>+- Scan
> CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false
>
>
>
> Srikanth
>


local class incompatible: stream classdesc serialVersionUID

2016-01-28 Thread Jason Plurad
I've searched through the mailing list archive. It seems that if you try to
run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
server, you will run into an exception like this:

WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage
0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
serialVersionUID = -3343649307726848892, local class serialVersionUID =
-3996494161745401652

If my application is using a library that builds against Spark 1.5.2, does
that mean that my application is now tied to that same Spark standalone
server version?

Is there a recommended way for that library to have a Spark dependency but
keep it compatible against a wider set of versions, i.e. any version 1.5.x?

Thanks!


Re: streaming in 1.6.0 slower than 1.5.1

2016-01-28 Thread Ted Yu
bq. The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.

>From the information you posted, it seems the above is backwards.

BTW [B is byte[], not class B.

FYI

On Thu, Jan 28, 2016 at 11:49 AM, Jesse F Chen  wrote:

> I ran the same streaming application (compiled individually for 1.5.1 and
> 1.6.0) that processes 5-second tweet batches.
>
> I noticed two things:
>
> 1. 10% regression in 1.6.0 vs 1.5.1
>
> Spark v1.6.0: 1,564 tweets/s
> Spark v1.5.1: 1,747 tweets/s
>
> 2. 1.6.0 streaming seems to have a memory leak.
>
> 1.6.0, processing time gradually increases and eventually exceeds 5
> seconds so batches started to queue up.
> While in 1.5.1, no such slow down. See chart below to see the increasing
> scheduling delay in 1.6:
>
>
>
> I captured heap dumps in two version and did a comparison. I noticed the
> Byte base class is using 50X more space in 1.5.1.
>
> Here are some top classes in heap histogram and references.
>
> Heap Histogram
>
> All Classes (excluding platform)
> 1.6.0 Streaming 1.5.1 Streaming
> Class Instance Count Total Size Class Instance Count Total Size
> class [B 8453 *3,227,649,599 * class [B 5095 62,938,466
> class [C 44682 4,255,502 class [C 130482 12,844,182
> class java.lang.reflect.Method 9059 1,177,670 class java.lang.String
> 130171 1,562,052
>
>
> References by Type References by Type
>
> class [B [0x640039e38] class [B [0x6c020bb08]
>
> Referrers by Type Referrers by Type
>
> Class Count Class Count
> java.nio.HeapByteBuffer *3239* sun.security.util.DerInputBuffer 1233
> sun.security.util.DerInputBuffer 1233 sun.security.util.ObjectIdentifier
> 620
> sun.security.util.ObjectIdentifier 620 [[B 397
> [Ljava.lang.Object; 408 java.lang.reflect.Method 326
>
>
> 
>
> The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
> The Java.nio.HeapByteBuffer referencing class did not show up in top in
> 1.5.1.
>
> I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get
> them here
>
> https://ibm.box.com/sparkstreaming-jstack160
> https://ibm.box.com/sparkstreaming-jstack151
>
> Jesse
>
>
>
>
>
>
>


Re: streaming in 1.6.0 slower than 1.5.1

2016-01-28 Thread Shixiong(Ryan) Zhu
Hey Jesse,

Could you provide the operators you using?

For the heap dump, it may be not a real memory leak. Since batches started
to queue up, the memory usage should increase.

On Thu, Jan 28, 2016 at 11:54 AM, Ted Yu  wrote:

> bq. The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
>
> From the information you posted, it seems the above is backwards.
>
> BTW [B is byte[], not class B.
>
> FYI
>
> On Thu, Jan 28, 2016 at 11:49 AM, Jesse F Chen  wrote:
>
>> I ran the same streaming application (compiled individually for 1.5.1 and
>> 1.6.0) that processes 5-second tweet batches.
>>
>> I noticed two things:
>>
>> 1. 10% regression in 1.6.0 vs 1.5.1
>>
>> Spark v1.6.0: 1,564 tweets/s
>> Spark v1.5.1: 1,747 tweets/s
>>
>> 2. 1.6.0 streaming seems to have a memory leak.
>>
>> 1.6.0, processing time gradually increases and eventually exceeds 5
>> seconds so batches started to queue up.
>> While in 1.5.1, no such slow down. See chart below to see the increasing
>> scheduling delay in 1.6:
>>
>>
>>
>> I captured heap dumps in two version and did a comparison. I noticed the
>> Byte base class is using 50X more space in 1.5.1.
>>
>> Here are some top classes in heap histogram and references.
>>
>> Heap Histogram
>>
>> All Classes (excluding platform)
>> 1.6.0 Streaming 1.5.1 Streaming
>> Class Instance Count Total Size Class Instance Count Total Size
>> class [B 8453 *3,227,649,599 * class [B 5095 62,938,466
>> class [C 44682 4,255,502 class [C 130482 12,844,182
>> class java.lang.reflect.Method 9059 1,177,670 class java.lang.String
>> 130171 1,562,052
>>
>>
>> References by Type References by Type
>>
>> class [B [0x640039e38] class [B [0x6c020bb08]
>>
>> Referrers by Type Referrers by Type
>>
>> Class Count Class Count
>> java.nio.HeapByteBuffer *3239* sun.security.util.DerInputBuffer 1233
>> sun.security.util.DerInputBuffer 1233 sun.security.util.ObjectIdentifier
>> 620
>> sun.security.util.ObjectIdentifier 620 [[B 397
>> [Ljava.lang.Object; 408 java.lang.reflect.Method 326
>>
>>
>> 
>>
>> The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
>> The Java.nio.HeapByteBuffer referencing class did not show up in top in
>> 1.5.1.
>>
>> I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get
>> them here
>>
>> https://ibm.box.com/sparkstreaming-jstack160
>> https://ibm.box.com/sparkstreaming-jstack151
>>
>> Jesse
>>
>>
>>
>>
>>
>>
>>
>


Broadcast join on multiple dataframes

2016-01-28 Thread Srikanth
Hello,

I have a use case where one large table has to be joined with several
smaller tables.
I've added broadcast hint for all small tables in the joins.

val largeTableDF = sqlContext.read.format("com.databricks.spark.csv")

val metaActionDF = sqlContext.read.format("json")
val cidOrgDF = sqlContext.read.format("com.databricks.spark.csv")
val metaLocationDF =
sqlContext.read.format("json").option("samplingRatio","0.1").load(metaLocationFile)
   .join(broadcast(metaActionDF),
"campaign_id")
   .join(broadcast(cidOrgDF),
List("organization_id"), "left_outer")

val metaCreativeDF = sqlContext.read.format("json")
val metaExchangeDF = sqlContext.read.format("json")
val localizationDF = sqlContext.read.format("com.databricks.spark.csv")
val techKeyDF = sqlContext.read.format("com.databricks.spark.csv")

val joinedBidderDF = largeTableDF.as("BID")
.join(broadcast(metaLocationDF), "strategy_id")
.join(broadcast(metaCreativeDF), "creative_id")
.join(broadcast(metaExchangeDF), $"exchange_id"
=== $"id" , "left_outer")
.join(broadcast(techKeyDF).as("TK"),
$"BID.tech_id" === $"TK.tech_key" , "left_outer")
.join(broadcast(localizationDF).as("BL"),
$"BID.language" === $"BL.id" , "left_outer")

When I look at the execution plan, all the joins are marked as
broadcastjoin.
But when I look at the spark job UI, the DAG visualization shows that some
joins are sortmerged with shuffle involved.
The ones that I've highlighted in yellow were shuffled.
DAG can be viewed here -
https://www.dropbox.com/s/mnxu5h067uyzdaj/DAG.PNG?dl=0

Why is the actual execution as seen in the DAG different from the physical
plan pasted below.
I'm trying not to shuffle my largeTable. Any idea what is causing this?

== Physical Plan ==

BroadcastHashOuterJoin [language#61], [id#167], LeftOuter, None

:- BroadcastHashOuterJoin [tech_id#59], [tech_key#170], LeftOuter, None

:  :- BroadcastHashOuterJoin [cast(exchange_id#13 as bigint)], [id#143L],
LeftOuter, None

:  :  :- Project [...]

:  :  :  +- BroadcastHashJoin [cast(creative_id#9 as bigint)],
[creative_id#131L], BuildRight

:  :  : :- Project [...]

:  :  : :  +- BroadcastHashJoin [cast(strategy_id#10 as bigint)],
[strategy_id#127L], BuildRight

:  :  : : :- ConvertToUnsafe

:  :  : : :  +- Scan
CsvRelation(,Some(file:///shared/data/bidder/*.lzo),false,

:  :  : : +- Project [...]

:  :  : :+- BroadcastHashOuterJoin [organization_id#90L],
[cast(organization_id#102 as bigint)], LeftOuter, None

:  :  : :   :- Project [...]

:  :  : :   :  +- BroadcastHashJoin [campaign_id#105L],
[campaign_id#75L], BuildRight

:  :  : :   : :- Project [...]

:  :  : :   : :  +- Scan
JSONRelation[id#112L,name#115,campaign_id#105L] InputPaths:
file:/shared/data/t1_meta/t1_meta_strategy.jsonl

:  :  : :   : +- Scan JSONRelation[] InputPaths:
file:/shared/data/t1_meta/t1_meta_campaign.jsonl

:  :  : :   +- ConvertToUnsafe

:  :  : :  +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/cid-orgs.txt),false,,,

:  :  : +- Scan
JSONRelation[creative_id#131L,creative_name#132,concept_id#129L,concept_name#130]
InputPaths: file:/shared/data/t1_meta/t1_meta_creative.jsonl

:  :  +- Scan JSONRelation[description#142,id#143L,name#144] InputPaths:
file:/shared/data/t1_meta/t1_meta_exchange.jsonl

:  +- ConvertToUnsafe

: +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/technology_key.txt),false,


+- ConvertToUnsafe

   +- Scan
CsvRelation(,Some(file:///shared/data/t1_meta/browser_languages.osv),false



Srikanth


How to write a custom window function?

2016-01-28 Thread Benyi Wang
I'm trying to implement a WindowFunction like collect_list, but I have to
collect a struct. collect_list works only for primitive type.

I think I might modify GenericUDAFCollectList, but haven't tried it yet.

I'm wondering if there is an example showing how to write a custom
WindowFunction in Spark-sql

Thanks.


Re: Understanding Spark Task failures

2016-01-28 Thread Patrick McGloin
Hi Tathagata,

Thanks for the response.  I can add in a try catch myself and handle user
exceptions, that's true, so maybe my example wasn't a very good one.  I'm
more worried about OOM exceptions and other run-time exceptions (that could
happen outside my try catch).

For example, I have this periodic "java.io.IOException: Class not found"
exception at the moment:

https://forums.databricks.com/questions/6601/javaioioexception-class-not-found-on-long-running.html

After this happens I lose data even though I have the WAL setup.  With the
WAL I can ensure that the data is safely stored when it has come into the
system from an external source, and I only ACK the external source after it
has been stored.  But it seems that there is no guarantee that the data is
successfully processed?

I assume I am right in what I am saying about losing data with the WAL
setup correctly.  The WAL works when stopping and starting the application,
etc.  But something is not handling the run time exception well.  This was
the start of my investigation into what is going wrong, so of course there
could be another reason for what I'm seeing.



On 28 January 2016 at 21:43, Tathagata Das 
wrote:

> That is hard to guarantee by the system, and it is upto the app developer
> to ensure that this is not . For example, if the data in a message is
> corrupted, unless the app code is robust towards handling such data, the
> system will fail every time it retries that app code.
>
> On Thu, Jan 28, 2016 at 8:51 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> I am trying to understand what will happen when Spark has an exception
>> during processing, especially while streaming.
>>
>> If I have a small code spinet like this:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   throw new Exception("User exception...")
>> }
>>
>> If I run this I will get output like this:
>>
>> [info] processed => [List(Item1)]
>> [error] 28-01-2016 17:41:18 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999278000 ms.0
>> [error] java.lang.Exception: User exception...
>> ...
>> [info] processed => [List(Item2)]
>> [error] 28-01-2016 17:41:19 ERROR JobScheduler:96 - Error running job 
>> streaming job 1453999279000 ms.0
>> [error] java.lang.Exception: User exception...
>>
>> First "Item1" is processed, and it fails (of course). In the next batch
>> "Item2" is processed. The record "Item1" has now been lost.
>>
>> If I change my code so that the exception occurs inside a task:
>>
>> myDStream.foreachRDD { (rdd: RDD[String]) =>
>>   println(s"processed => [${rdd.collect().toList}]")
>>   rdd.map{case x => throw new Exception("User exception...") }.collect()
>> }
>>
>> Then the map closure will be retried, but once it has failed enough times
>> the record is discarded and processing continues to the next record.
>>
>> Is it possible to ensure that records are not discarded, even if this
>> means stopping the application? I have the WAL enabled.
>>
>
>


Re: Databricks Cloud vs AWS EMR

2016-01-28 Thread Michal Klos
We use both databricks and emr. We use databricks for our exploratory / adhoc 
use cases because their notebook is pretty badass and better than Zeppelin IMHO.

We use EMR for our production machine learning and ETL tasks. The nice thing 
about EMR is you can use applications other than spark. From a "tools in the 
toolbox" perspective this is very important.

M

> On Jan 28, 2016, at 6:05 PM, Sourav Mazumder  
> wrote:
> 
> You can also try out IBM's spark as a service in IBM Bluemix. You'll get 
> there all required features for security, multitenancy, notebook, integration 
> with other big data services. You can try that out for free too.
> 
> Regards,
> Sourav
> 
> On Thu, Jan 28, 2016 at 2:10 PM, Rakesh Soni  wrote:
 At its core, EMR just launches Spark applications, whereas Databricks is a 
 higher-level platform that also includes multi-user support, an 
 interactive UI, security, and job scheduling.
 
 Specifically, Databricks runs standard Spark applications inside a user’s 
 AWS account, similar to EMR, but it adds a variety of features to create 
 an end-to-end environment for working with Spark. These include:
 
 Interactive UI (includes a workspace with notebooks, dashboards, a job 
 scheduler, point-and-click cluster management)
 Cluster sharing (multiple users can connect to the same cluster, saving 
 cost)
 Security features (access controls to the whole workspace)
 Collaboration (multi-user access to the same notebook, revision control, 
 and IDE and GitHub integration)
 Data management (support for connecting different data sources to Spark, 
 caching service to speed up queries)
 
 The idea is that a lot of Spark deployments soon need to bring in multiple 
 users, different types of jobs, etc, and we want to have these built-in. 
 But if you just want to connect to existing data and run jobs, that also 
 works.
 
 The cluster manager in Databricks is based on Standalone mode, not YARN, 
 but Databricks adds several features, such as allowing multiple users to 
 run commands on the same cluster and running multiple versions of Spark. 
 Because Databricks is also the team that initially built Spark, the 
 service is very up to date and integrated with the newest Spark features 
 -- e.g. you can run previews of the next release, any data in Spark can be 
 displayed visually, etc.
 
 From: Alex Nastetsky 
 Subject: Databricks Cloud vs AWS EMR
 Date: January 26, 2016 at 11:55:41 AM PST
 To: user 
 
 As a user of AWS EMR (running Spark and MapReduce), I am interested in 
 potential benefits that I may gain from Databricks Cloud. I was wondering 
 if anyone has used both and done comparison / contrast between the two 
 services.
 
 In general, which resource manager(s) does Databricks Cloud use for Spark? 
 If it's YARN, can you also run MapReduce jobs in Databricks Cloud?
 
 Thanks.
>> --
> 


building spark 1.6.0 fails

2016-01-28 Thread Carlile, Ken
I am attempting to build Spark 1.6.0 from source on EL 6.3, using Oracle jdk 
1.8.0.45, Python 2.7.6, and Scala 2.10.3. When I try to issue build/mvn/ 
-DskipTests clean package, I get the following: 

[INFO] Using zinc server for incremental compilation
[info] Compiling 3 Java sources to 
/misc/local/spark-versions/spark-1.6.0-patched/spark-1.6.0/tags/target/scala-2.10/classes...
[error] javac: invalid source release: 1.7
[error] Usage: javac  
[error] use -help for a list of possible options
[error] Compile failed at Jan 28, 2016 8:56:36 AM [0.113s]

I tried changing the pom.xml to have java version as 1.8, but I just got the 
same error with invalid source release: 1.8 instead of 1.7. 

My java -version and javac -version are reporting as 1.8.0.45, and I have the 
JAVA_HOME env set. Anyone have any ideas? 

Incidentally, building 2.0.0 from source worked fine… 

Thanks, 
Ken

Spark Caching Kafka Metadata

2016-01-28 Thread asdf zxcv
Does Spark cache which kafka topics exist? A service incorrectly assumes
all the relevant topics exist, even if they are empty, causing it to fail.
Fortunately the service is automatically restarted and by default, kafka
creates the topic after it is requested.

I'm trying to create the topic if it doesn't exist using
AdminUtils.createTopic:

  val zkClient = new ZkClient("localhost:2181", 1, 1,
ZKStringSerializer)
  while (!AdminUtils.topicExists(zkClient, topic)) {
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties())
  }

But I still get an Error getting partition metadata for 'topic-name'. Does
the topic exist? when I execute KafkaUtils.createDirectStream

I've also tried to implement a retry with a wait such that the retry should
occur after Kafka has created the requested topic with
auto.create.topics.enable
= true, but this still doesn't work consistently.

This is a bit frustrating to debug as well since the topic is successfully
created about 50% of the time, other times I get message "Does the topic
exist?". My thinking is that Spark may be caching the list of extant kafka
topics, ignoring that I've added a new one. Is this the case? Am I missing
something?


Ben


Data not getting printed in Spark Streaming with print().

2016-01-28 Thread satyajit vegesna
HI All,

I am trying to run HdfsWordCount example from github.

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala

i am using ubuntu to run the program, but dont see any data getting printed
after ,
---
Time: 145402680 ms
---

I dont see any errors, the program just runs, but i do not see any output
of the data corresponding to the file used.

object HdfsStream {

  def main(args:Array[String]): Unit = {

val sparkConf = new
SparkConf().setAppName("SpoolDirSpark").setMaster("local[5]")
val ssc = new StreamingContext(sparkConf, Minutes(10))

//val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark"
//val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark/test.txt"
val inputDirectory = "file:///home/satyajit/jsondata/"

val lines =
ssc.fileStream[LongWritable,Text,TextInputFormat](inputDirectory).map{case(x,y)=>
(x.toString,y.toString)}
//lines.saveAsTextFiles("hdfs://localhost:9000/SpoolDirSpark/datacheck")
lines.saveAsTextFiles("file:///home/satyajit/jsondata/")

println("check_data"+lines.print())

ssc.start()
ssc.awaitTermination()

Would like to know if there is any workaround, or if there is something i
am missing.

Thanking in advance,
Satyajit.


Re: Data not getting printed in Spark Streaming with print().

2016-01-28 Thread Shixiong(Ryan) Zhu
fileStream has a parameter "newFilesOnly". By default, it's true and means
processing only new files and ignore existing files in the directory. So
you need to ***move*** the files into the directory, otherwise it will
ignore existing files.

You can also set "newFilesOnly" to false. Then in the first batch, it will
process all existing files.

On Thu, Jan 28, 2016 at 4:22 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> HI All,
>
> I am trying to run HdfsWordCount example from github.
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
>
> i am using ubuntu to run the program, but dont see any data getting
> printed after ,
> ---
> Time: 145402680 ms
> ---
>
> I dont see any errors, the program just runs, but i do not see any output
> of the data corresponding to the file used.
>
> object HdfsStream {
>
>   def main(args:Array[String]): Unit = {
>
> val sparkConf = new 
> SparkConf().setAppName("SpoolDirSpark").setMaster("local[5]")
> val ssc = new StreamingContext(sparkConf, Minutes(10))
>
> //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark"
> //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark/test.txt"
> val inputDirectory = "file:///home/satyajit/jsondata/"
>
> val lines = 
> ssc.fileStream[LongWritable,Text,TextInputFormat](inputDirectory).map{case(x,y)=>
>  (x.toString,y.toString)}
> //lines.saveAsTextFiles("hdfs://localhost:9000/SpoolDirSpark/datacheck")
> lines.saveAsTextFiles("file:///home/satyajit/jsondata/")
>
> println("check_data"+lines.print())
>
> ssc.start()
> ssc.awaitTermination()
>
> Would like to know if there is any workaround, or if there is something i
> am missing.
>
> Thanking in advance,
> Satyajit.
>


Re: Spark, Mesos, Docker and S3

2016-01-28 Thread Mao Geng
>From my limited knowledge, only limited options such as network mode,
volumes, portmaps can be passed through. See
https://github.com/apache/spark/pull/3074/files.

https://issues.apache.org/jira/browse/SPARK-8734 is open for exposing all
docker options to spark.

-Mao

On Thu, Jan 28, 2016 at 1:55 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Thank you., I figured it out. I have set executor memory to minimal and it
> works.,
>
> Another issue has come.. I have to pass --add-host option while running
> containers in slave nodes.. Is there any option to pass docker run
> parameters from spark?
> On Thu, Jan 28, 2016 at 12:26 PM Mao Geng  wrote:
>
>> Sathish,
>>
>> I guess the mesos resources are not enough to run your job. You might
>> want to check the mesos log to figure out why.
>>
>> I tried to run the docker image with "--conf spark.mesos.coarse=false"
>> and "true". Both are fine.
>>
>> Best,
>> Mao
>>
>> On Wed, Jan 27, 2016 at 5:00 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> On the same Spark/Mesos/Docker setup, I am getting warning "Initial Job
>>> has not accepted any resources; check your cluster UI to ensure that
>>> workers are registered and have sufficient resources". I am running in
>>> coarse grained mode. Any pointers on how to fix this issue? Please help. I
>>> have updated both docker.properties and spark-default.conf with  
>>> spark.mesos.executor.docker.image
>>> and other properties.
>>>
>>>
>>> Thanks
>>>
>>> Sathish
>>>
>>> On Wed, Jan 27, 2016 at 9:58 AM Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
 Thanks a lot for your info! I will try this today.
 On Wed, Jan 27, 2016 at 9:29 AM Mao Geng  wrote:

> Hi Sathish,
>
> The docker image is normal, no AWS profile included.
>
> When the driver container runs with --net=host, the driver host's AWS
> profile will take effect so that the driver can access the protected s3
> files.
>
> Similarly,  Mesos slaves also run Spark executor docker container in
> --net=host mode, so that the AWS profile of Mesos slaves will take effect.
>
> Hope it helps,
> Mao
>
> On Jan 26, 2016, at 9:15 PM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
> Hi Mao,
>
> I want to check on accessing the S3 from Spark docker in Mesos.  The
> EC2 instance that I am using has the AWS profile/IAM included.  Should we
> build the docker image with any AWS profile settings or --net=host docker
> option takes care of it?
>
> Please help
>
>
> Thanks
>
> Sathish
>
> On Tue, Jan 26, 2016 at 9:04 PM Mao Geng  wrote:
>
>> Thank you very much, Jerry!
>>
>> I changed to "--jars
>> /opt/spark/lib/hadoop-aws-2.7.1.jar,/opt/spark/lib/aws-java-sdk-1.7.4.jar"
>> then it worked like a charm!
>>
>> From Mesos task logs below, I saw Mesos executor downloaded the jars
>> from the driver, which is a bit unnecessary (as the docker image already
>> has them), but that's ok - I am happy seeing Spark + Mesos + Docker + S3
>> worked together!
>>
>> Thanks,
>> Mao
>>
>> 16/01/27 02:54:45 INFO Executor: Using REPL class URI: 
>> http://172.16.3.98:33771
>> 16/01/27 02:55:12 INFO CoarseGrainedExecutorBackend: Got assigned task 0
>> 16/01/27 02:55:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
>> 16/01/27 02:55:12 INFO Executor: Fetching 
>> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar with timestamp 
>> 1453863280432
>> 16/01/27 02:55:12 INFO Utils: Fetching 
>> http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar to 
>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp1518118694295619525.tmp
>> 16/01/27 02:55:12 INFO Utils: Copying 
>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/-19880839621453863280432_cache
>>  to /./hadoop-aws-2.7.1.jar
>> 16/01/27 02:55:12 INFO Executor: Adding file:/./hadoop-aws-2.7.1.jar to 
>> class loader
>> 16/01/27 02:55:12 INFO Executor: Fetching 
>> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar with timestamp 
>> 1453863280472
>> 16/01/27 02:55:12 INFO Utils: Fetching 
>> http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar to 
>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp8868621397726761921.tmp
>> 16/01/27 02:55:12 INFO Utils: Copying 
>> /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/8167072821453863280472_cache
>>  to /./aws-java-sdk-1.7.4.jar
>> 16/01/27 02:55:12 INFO Executor: Adding file:/./aws-java-sdk-1.7.4.jar 
>> to class loader
>>
>> On Tue, Jan 26, 2016 at 5:40 PM, Jerry Lam 
>> wrote:
>>
>>> Hi Mao,
>>>
>>> Can you try --jars to include those jars?
>>>
>>> Best Regards,

Getting Exceptions/WARN during random runs for same dataset

2016-01-28 Thread Khusro Siddiqui
Hi Everyone,

Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
1.4.1 and scala 2.10.5.

I am using Dataframes to query Cassandra, do processing and store the
result back into Cassandra. The job is being submitted using spark-submit
on a cluster of 3 nodes. While doing so I get three WARN messages:

WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager: Lost
task 99.0 in stage 2.0 (TID 107, 10.2.1.82): java.io.InvalidClassException:
org.apache.spark.sql.types.TimestampType$; unable to create instance

Caused by: java.lang.reflect.InvocationTargetException

Caused by: java.lang.UnsupportedOperationException: tail of empty list


For example, if I am running the same job, for the same input set of data,
say 20 times,

- 11 times it will run successfully without any WARN messages

- 4 times it will run successfully with the above messages

- 6 times it will run successfully by randomly giving one or two of the
exceptions above


In all the 20 runs, the output data is coming as expected and there is no
error in that. My concern is, why is it not giving these messages every
time I do a spark-submit but only at times. Also, the stack trace does not
point to any specific point in my line of code. Full stack trace is as
follows. Please let me know if you need any other information


WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager: Lost
task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
unable to create instance

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at 

Re: building spark 1.6.0 fails

2016-01-28 Thread Ted Yu
I tried the following command:

build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.4
-Dhadoop.version=2.7.0 package -DskipTests

I didn't encounter the error you mentioned.

bq. Using zinc server for incremental compilation

Was it possible that zinc was running before you started the build ?
Can you stop zinc server and try again ?

Cheers


On Thu, Jan 28, 2016 at 5:24 PM, Carlile, Ken 
wrote:

> I am attempting to build Spark 1.6.0 from source on EL 6.3, using Oracle
> jdk 1.8.0.45, Python 2.7.6, and Scala 2.10.3. When I try to issue
> build/mvn/ -DskipTests clean package, I get the following:
>
> [INFO] Using zinc server for incremental compilation
> [info] Compiling 3 Java sources to
> /misc/local/spark-versions/spark-1.6.0-patched/spark-1.6.0/tags/target/scala-2.10/classes...
> [error] javac: invalid source release: 1.7
> [error] Usage: javac  
> [error] use -help for a list of possible options
> [error] Compile failed at Jan 28, 2016 8:56:36 AM [0.113s]
>
> I tried changing the pom.xml to have java version as 1.8, but I just got
> the same error with invalid source release: 1.8 instead of 1.7.
>
> My java -version and javac -version are reporting as 1.8.0.45, and I have
> the JAVA_HOME env set. Anyone have any ideas?
>
> Incidentally, building 2.0.0 from source worked fine…
>
> Thanks,
> Ken


Re: Getting Exceptions/WARN during random runs for same dataset

2016-01-28 Thread Khusro Siddiqui
It is happening on random executors on random nodes. Not on any specific
node everytime.
Or not happening at all

On Thu, Jan 28, 2016 at 7:42 PM, Ted Yu  wrote:

> Did the UnsupportedOperationException's happen from the executors on all the
> nodes or only one node ?
>
> Thanks
>
> On Thu, Jan 28, 2016 at 5:13 PM, Khusro Siddiqui 
> wrote:
>
>> Hi Everyone,
>>
>> Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
>> 1.4.1 and scala 2.10.5.
>>
>> I am using Dataframes to query Cassandra, do processing and store the
>> result back into Cassandra. The job is being submitted using spark-submit
>> on a cluster of 3 nodes. While doing so I get three WARN messages:
>>
>> WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager: Lost
>> task 99.0 in stage 2.0 (TID 107, 10.2.1.82): java.io.InvalidClassException:
>> org.apache.spark.sql.types.TimestampType$; unable to create instance
>>
>> Caused by: java.lang.reflect.InvocationTargetException
>>
>> Caused by: java.lang.UnsupportedOperationException: tail of empty list
>>
>>
>> For example, if I am running the same job, for the same input set of
>> data, say 20 times,
>>
>> - 11 times it will run successfully without any WARN messages
>>
>> - 4 times it will run successfully with the above messages
>>
>> - 6 times it will run successfully by randomly giving one or two of
>> the exceptions above
>>
>>
>> In all the 20 runs, the output data is coming as expected and there is no
>> error in that. My concern is, why is it not giving these messages every
>> time I do a spark-submit but only at times. Also, the stack trace does not
>> point to any specific point in my line of code. Full stack trace is as
>> follows. Please let me know if you need any other information
>>
>>
>> WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager: Lost
>> task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
>> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
>> unable to create instance
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>
>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>
>> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>
>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>
>> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>>
>> at
>> 

Spark Distribution of Small Dataset

2016-01-28 Thread Philip Lee
Hi,

Simple Question about Spark Distribution of Small Dataset.

Let's say I have 8 machine with 48 cores and 48GB of RAM as a cluster.
Dataset  (format is ORC by Hive) is so small like 1GB, but I copied it to
HDFS.

1) if spark-sql run the dataset distributed on HDFS in each machine, what
happens to the job? I meant one machine handles the dataset because it is
so small?

2) but the thing is dataset is already distributed in each machine.
or each machine handles the distributed dataset and send it to the Master
Node?

Could you explain about this in detail in a distributed way?

Best,
Phil


??????Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I move Hive table to Spark?

2016-01-28 Thread ????????
If we support TableScanDesc.FILTER_EXPR_CONF_STR like hive 

we may write sql LIKE this 

select ydb_sex from ydb_example_shu where ydbpartion='20151110' limit 10
select ydb_sex from ydb_example_shu where ydbpartion='20151110' and 
(ydb_sex='??' or ydb_province='' or ydb_day>='20151217') limit 10
select count(*) from ydb_example_shu where ydbpartion='20151110' and 
(ydb_sex='??' or ydb_province='' or ydb_day>='20151217') limit 10


If we may not  support TableScanDesc.FILTER_EXPR_CONF_STR like hive  we write 
Sql like this  

set ya100.spark.filter.ydb_example_shu=ydbpartion='20151110';
select ydb_sex from ydb_example_shu  limit 10

set ya100.spark.filter.ydb_example_shu=ydbpartion='20151110' and (ydb_sex='??' 
or ydb_province='' or ydb_day>='20151217');
select ydb_sex from ydb_example_shu  limit 10

set ya100.spark.filter.ydb_example_shu=ydbpartion='20151110' and (ydb_sex='??' 
or ydb_province='' or ydb_day>='20151217');
select count(*) from ydb_example_shu limit 10

set ya100.spark.filter.ydb_example_shu=ydbpartion='20151110' and (ydb_sex in 
('??','??','',''));
select ydb_sex,ydb_province from ydb_example_shu   limit 10

set ya100.spark.filter.ydb_example_shu=ydbpartion='20151110';
select count(*) from ydb_example_shu   limit 10



--  --
??: "";;
: 2016??1??28??(??) 8:28
??: ""; "user"; 
"dev"; 

: ??Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I 
move Hive table to Spark?



we always used Sql like below.

select count(*) from ydb_example_shu where ydbpartion='20151110' and 
(ydb_sex='' or ydb_province='LIAONING' or ydb_day>='20151217') limit 10

Spark don't push down predicates for TableScanDesc.FILTER_EXPR_CONF_STR, which 
means that every query is full scan can`t use the index (Something like 
HbaseStoreHandle).








--  --
??: "";;
: 2016??1??28??(??) 7:27
??: "user"; "dev"; 

: Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I move 
Hive table to Spark?



Dear spark
I am test StorageHandler on Spark-SQL.
but i find the TableScanDesc.FILTER_EXPR_CONF_STR is miss ,but i need it ,is 
three any where i could found it?
I really want to get some filter information from Spark Sql, so that I could 
make a pre filter by my Index ;
so where is the 
TableScanDesc.FILTER_EXPR_CONF_STR=hive.io.filter.expr.serialized? it is 
missing or replace by other method ,thanks every body ,thanks .


for example  I make a custorm StorageHandler like hive .

creat table xxx(...)
STORED BY 'cn.net.ycloud.ydb.handle.Ya100StorageHandler' 
TBLPROPERTIES(
"ya100.handler.master"="101.200.130.48:8080",
"ya100.handler.table.name"="ydb_example_shu",
"ya100.handler.columns.mapping"="phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,content,ydbpartion,ya100_pipe"
)

in Ya100StorageHandler code .
I wang to use TableScanDesc.FILTER_EXPR_CONF_STR  like this

  String filterExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return "";
// throw new IOException("can`t found filter condition in your Sql ,at 
least you must special a field as ydbpartion ");
}else{
LOG.info(filterExprSerialized);
ExprNodeGenericFuncDesc filterExpr =
Utilities.deserializeExpression(filterExprSerialized);
LOG.info(filterExpr);
try {
return Ya100Utils.parserFilter(filterExpr,info);
} catch (Throwable e) {
throw new IOException(e);
}
}

??????Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I move Hive table to Spark?

2016-01-28 Thread ????????
we always used Sql like below.

select count(*) from ydb_example_shu where ydbpartion='20151110' and 
(ydb_sex='' or ydb_province='LIAONING' or ydb_day>='20151217') limit 10

Spark don't push down predicates for TableScanDesc.FILTER_EXPR_CONF_STR, which 
means that every query is full scan can`t use the index (Something like 
HbaseStoreHandle).








--  --
??: "";;
: 2016??1??28??(??) 7:27
??: "user"; "dev"; 

: Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I move 
Hive table to Spark?



Dear spark
I am test StorageHandler on Spark-SQL.
but i find the TableScanDesc.FILTER_EXPR_CONF_STR is miss ,but i need it ,is 
three any where i could found it?
I really want to get some filter information from Spark Sql, so that I could 
make a pre filter by my Index ;
so where is the 
TableScanDesc.FILTER_EXPR_CONF_STR=hive.io.filter.expr.serialized? it is 
missing or replace by other method ,thanks every body ,thanks .


for example  I make a custorm StorageHandler like hive .

creat table xxx(...)
STORED BY 'cn.net.ycloud.ydb.handle.Ya100StorageHandler' 
TBLPROPERTIES(
"ya100.handler.master"="101.200.130.48:8080",
"ya100.handler.table.name"="ydb_example_shu",
"ya100.handler.columns.mapping"="phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,content,ydbpartion,ya100_pipe"
)

in Ya100StorageHandler code .
I wang to use TableScanDesc.FILTER_EXPR_CONF_STR  like this

  String filterExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return "";
// throw new IOException("can`t found filter condition in your Sql ,at 
least you must special a field as ydbpartion ");
}else{
LOG.info(filterExprSerialized);
ExprNodeGenericFuncDesc filterExpr =
Utilities.deserializeExpression(filterExprSerialized);
LOG.info(filterExpr);
try {
return Ya100Utils.parserFilter(filterExpr,info);
} catch (Throwable e) {
throw new IOException(e);
}
}

Re: can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Sebastian Piu
That explains it! Thanks :)

On Thu, Jan 28, 2016 at 9:52 AM, Tathagata Das 
wrote:

> its been renamed to mapWithState when 1.6.0 was released. :)
>
>
>
> On Thu, Jan 28, 2016 at 1:51 AM, Sebastian Piu 
> wrote:
>
>> I wanted to give the new trackStateByKey method a try, but I'm missing
>> something very obvious here as I can't see it on the 1.6.0 jar. Is there
>> anything in particular I have to do or is just maven playing tricks with me?
>>
>> this is the dependency I'm using:
>> 
>> org.apache.spark
>> spark-streaming_2.10
>> 1.6.0
>> 
>>
>>
>>
>


Re: bug for large textfiles on windows

2016-01-28 Thread Christopher Bourez
Dears,

I recompiled Spark on Windows, sounds to work better. My problem with
Pyspark remains :
https://issues.apache.org/jira/browse/SPARK-12261

I do not know how to debug this, sounds to be linked with Pickle, the
garbage collector... I would like to clear the Spark context to see if I
can gain anything.

Christopher Bourez
06 17 17 50 60

On Mon, Jan 25, 2016 at 10:14 PM, Christopher Bourez <
christopher.bou...@gmail.com> wrote:

> Here is a pic of memory
> If I put --conf spark.driver.memory=3g, it increases the displaid memory,
> but the problem remains... for a file that is only 13M.
>
> Christopher Bourez
> 06 17 17 50 60
>
> On Mon, Jan 25, 2016 at 10:06 PM, Christopher Bourez <
> christopher.bou...@gmail.com> wrote:
>
>> The same problem occurs on my desktop at work.
>> What's great with AWS Workspace is that you can easily reproduce it.
>>
>> I created the test file with commands :
>>
>> for i in {0..30}; do
>>   VALUE="$RANDOM"
>>   for j in {0..6}; do
>> VALUE="$VALUE;$RANDOM";
>>   done
>>   echo $VALUE >> test.csv
>> done
>>
>> Christopher Bourez
>> 06 17 17 50 60
>>
>> On Mon, Jan 25, 2016 at 10:01 PM, Christopher Bourez <
>> christopher.bou...@gmail.com> wrote:
>>
>>> Josh,
>>>
>>> Thanks a lot !
>>>
>>> You can download a video I created :
>>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/video.mov
>>>
>>> I created a sample file of 13 MB as explained :
>>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>>>
>>> Here are the commands I did :
>>>
>>> I created an Aws Workspace with Windows 7 (that I can share you if you'd
>>> like) with Standard instance, 2GiB RAM
>>> On this instance :
>>> I downloaded spark (1.5 or 1.6 same pb) with hadoop 2.6
>>> installed java 8 jdk
>>> downloaded python 2.7.8
>>>
>>> downloaded the sample file
>>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>>>
>>> And then the command lines I launch are :
>>> bin\pyspark --master local[1]
>>> sc.textFile("test.csv").take(1)
>>>
>>> As you can see, sc.textFile("test.csv", 2000).take(1) works well
>>>
>>> Thanks a lot !
>>>
>>>
>>> Christopher Bourez
>>> 06 17 17 50 60
>>>
>>> On Mon, Jan 25, 2016 at 8:02 PM, Josh Rosen 
>>> wrote:
>>>
 Hi Christopher,

 What would be super helpful here is a standalone reproduction. Ideally
 this would be a single Scala file or set of commands that I can run in
 `spark-shell` in order to reproduce this. Ideally, this code would generate
 a giant file, then try to read it in a way that demonstrates the bug. If
 you have such a reproduction, could you attach it to that JIRA ticket?
 Thanks!

 On Mon, Jan 25, 2016 at 7:53 AM Christopher Bourez <
 christopher.bou...@gmail.com> wrote:

> Dears,
>
> I would like to re-open a case for a potential bug (current status is
> resolved but it sounds not) :
>
> *https://issues.apache.org/jira/browse/SPARK-12261
> *
>
> I believe there is something wrong about the memory management under
> windows
>
> It has no sense to work with files smaller than a few Mo...
>
> Do not hesitate to ask me questions if you try to help and reproduce
> the bug,
>
> Best
>
> Christopher Bourez
> 06 17 17 50 60
>

>>>
>>
>


Re: can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Tathagata Das
its been renamed to mapWithState when 1.6.0 was released. :)



On Thu, Jan 28, 2016 at 1:51 AM, Sebastian Piu 
wrote:

> I wanted to give the new trackStateByKey method a try, but I'm missing
> something very obvious here as I can't see it on the 1.6.0 jar. Is there
> anything in particular I have to do or is just maven playing tricks with me?
>
> this is the dependency I'm using:
> 
> org.apache.spark
> spark-streaming_2.10
> 1.6.0
> 
>
>
>


Re: GraphX can show graph?

2016-01-28 Thread Sahil Sareen
Try Neo4j for visualization, GraphX does a pretty god job at distributed
graph processing.

On Thu, Jan 28, 2016 at 12:42 PM, Balachandar R.A.  wrote:

> Hi
>
> I am new to GraphX. I have a simple csv file which I could load and
> compute few graph statistics. However, I am not sure whether it is possible
> to create ad show graph (for visualization purpose) using GraphX. Any
> pointer to tutorial or information connected to this will be really helpful
>
> Thanks and regards
> Bala
>


can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Sebastian Piu
I wanted to give the new trackStateByKey method a try, but I'm missing
something very obvious here as I can't see it on the 1.6.0 jar. Is there
anything in particular I have to do or is just maven playing tricks with me?

this is the dependency I'm using:

org.apache.spark
spark-streaming_2.10
1.6.0



“java.io.IOException: Class not found” on long running Streaming application

2016-01-28 Thread Patrick McGloin
I am getting the exception below on a long running Spark Streaming
application. The exception could occur after a few minutes, but it may also
may not happen for days. This is with pretty consistent input data.

I have seen this Jira ticket
 (
https://issues.apache.org/jira/browse/SPARK-6152) but I don't think it is
the same issue. That is java.lang.IllegalArgumentException and this is
java.io.IOException:
Class not found.

My application is streaming data and writing to Parquet using Spark SQL.

I am using Spark 1.5.2. Any ideas?

28-01-2016 09:36:00 ERROR JobScheduler:96 - Error generating jobs for
time 145397376 ms
java.io.IOException: Class not found
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
Source)
at 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
Source)
at 
org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)
at 
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.map(RDD.scala:317)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.MappedDStream$$anonfun$compute$1.apply(MappedDStream.scala:35)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at 
org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStrea


Stream S3 server to Cassandra

2016-01-28 Thread Sateesh Karuturi
Hello Anyone... please help me to how to Stream the XML files from S3
server to cassandra db using Spark Streaming java. presently iam using
Spark core to do that job..but problem is i have to to run for every 15
mints.. thats why iam looking for Spark Streaming.


Explaination for info shown in UI

2016-01-28 Thread Sachin Aggarwal
Hi

I am executing a streaming wordcount with kafka
with one test topic with 2 partition
my cluster have three spark executors

Each batch is of 10 sec

for every batch(ex below * batch time 02:51:00*) I see 3 entry in spark UI
, as shown below below

my questions:-
1) As label says jobId for first column, does spark submits 3 jobs for each
batch ?
2) I tried decreasing executers/nodes the job count is also getting changed
what is the relation with no of  executors?
3) only one job actually executes the stage rest two shows skipped why
other jobs got created?

Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
stages): Succeeded/Total
221 Streaming job from [output operation 0, batch time 02:51:00] print at
StreamingWordCount.scala:54 2016/01/28 02:51:00 46 ms 1/1 (1 skipped)
1/1 (3 skipped)
220 Streaming job from [output operation 0, batch time 02:51:00] print at
StreamingWordCount.scala:54 2016/01/28 02:51:00 47 ms 1/1 (1 skipped)
4/4 (3 skipped)
219 Streaming job from [output operation 0, batch time 02:51:00] print at
StreamingWordCount.scala:54 2016/01/28 02:51:00 48 ms 2/2
4/4

-- 

Thanks & Regards

Sachin Aggarwal
7760502772


Why Spark-sql miss TableScanDesc.FILTER_EXPR_CONF_STR params when I move Hive table to Spark?

2016-01-28 Thread ????????
Dear spark
I am test StorageHandler on Spark-SQL.
but i find the TableScanDesc.FILTER_EXPR_CONF_STR is miss ,but i need it ,is 
three any where i could found it?
I really want to get some filter information from Spark Sql, so that I could 
make a pre filter by my Index ;
so where is the 
TableScanDesc.FILTER_EXPR_CONF_STR=hive.io.filter.expr.serialized? it is 
missing or replace by other method ,thanks every body ,thanks .


for example  I make a custorm StorageHandler like hive .

creat table xxx(...)
STORED BY 'cn.net.ycloud.ydb.handle.Ya100StorageHandler' 
TBLPROPERTIES(
"ya100.handler.master"="101.200.130.48:8080",
"ya100.handler.table.name"="ydb_example_shu",
"ya100.handler.columns.mapping"="phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,content,ydbpartion,ya100_pipe"
)

in Ya100StorageHandler code .
I wang to use TableScanDesc.FILTER_EXPR_CONF_STR  like this

  String filterExprSerialized = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (filterExprSerialized == null) {
return "";
// throw new IOException("can`t found filter condition in your Sql ,at 
least you must special a field as ydbpartion ");
}else{
LOG.info(filterExprSerialized);
ExprNodeGenericFuncDesc filterExpr =
Utilities.deserializeExpression(filterExprSerialized);
LOG.info(filterExpr);
try {
return Ya100Utils.parserFilter(filterExpr,info);
} catch (Throwable e) {
throw new IOException(e);
}
}

Re: Spark streaming flow control and back pressure

2016-01-28 Thread Lin Zhao
I'm using branch-1.6 built for 2.11 yesterday. Part of my actor receiver that 
stores data. The log reports millions while the job apparently back pressured 
according to UI (I. e. 2000 a 10s batch).


store((key, msg))
if (storeCount.incrementAndGet() % 10 == 0) {
  logger.info(s"Stored ${storeCount.get()} messages to spark}")
}

From: Iulian Dragoș 
>
Date: Thursday, January 28, 2016 at 5:33 AM
To: Lin Zhao >
Cc: "user@spark.apache.org" 
>
Subject: Re: Spark streaming flow control and back pressure

Calling `store` should get you there. What version of Spark are you using? Can 
you share your code?

iulian

On Thu, Jan 28, 2016 at 2:28 AM, Lin Zhao 
> wrote:
I have an actor receiver that reads data and calls "store()" to save data to 
spark. I was hoping spark.streaming.receiver.maxRate and 
spark.streaming.backpressure would help me block the method when needed to 
avoid overflowing the pipeline. But it doesn't. My actor pumps millions of 
lines to spark when backpressure and the rate limit is in effect. Whereas these 
data is slow flowing into the input blocks, the data created sits around and 
creates memory problem.

Is there guideline how to handle this? What's the best way for my actor to know 
it should slow down so it doesn't keep creating millions of messages? Blocking 
store() call seems aptable.

Thanks, Lin



--

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com



Streaming: LeaseExpiredException when writing checkpoint

2016-01-28 Thread Lin Zhao
I'm seeing this error in the driver when running a streaming job. Not sure If 
it's critical.

It happens maybe half of time checkpoint is saved. There are retries in the log 
but sometimes results in "Could not write checkpoint for time 145400632 ms 
to file 
hdfs://ip-172-31-35-122.us-west-2.compute.internal:8020/user/exabeam/checkpoint-145400632".
 Any help in understanding this error is appreciated.


16/01/28 18:38:40 INFO CheckpointWriter: Saving checkpoint for time 
145400632 ms to file 
'hdfs://ip-172-31-35-122.us-west-2.compute.internal:8020/user/exabeam/checkpoint-145400632'

16/01/28 18:38:40 WARN DFSClient: DataStreamer Exception

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /user/exabeam/temp (inode 2058161): File does not exist. [Lease.  
Holder: DFSClient_NONMAPR

EDUCE_762594086_1, pendingcreates: 2]

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3605)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3402)

at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3258)

at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:668)

at 
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)

at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)

at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)

at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)


at org.apache.hadoop.ipc.Client.call(Client.java:1468)

at org.apache.hadoop.ipc.Client.call(Client.java:1399)

at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)

at com.sun.proxy.$Proxy19.addBlock(Unknown Source)

at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)

at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

at com.sun.proxy.$Proxy20.addBlock(Unknown Source)

at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1544)

at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361)

at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:600)



Re: Spark, Mesos, Docker and S3

2016-01-28 Thread Mao Geng
Sathish,

I guess the mesos resources are not enough to run your job. You might want
to check the mesos log to figure out why.

I tried to run the docker image with "--conf spark.mesos.coarse=false" and
"true". Both are fine.

Best,
Mao

On Wed, Jan 27, 2016 at 5:00 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Hi,
>
> On the same Spark/Mesos/Docker setup, I am getting warning "Initial Job
> has not accepted any resources; check your cluster UI to ensure that
> workers are registered and have sufficient resources". I am running in
> coarse grained mode. Any pointers on how to fix this issue? Please help. I
> have updated both docker.properties and spark-default.conf with  
> spark.mesos.executor.docker.image
> and other properties.
>
>
> Thanks
>
> Sathish
>
> On Wed, Jan 27, 2016 at 9:58 AM Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Thanks a lot for your info! I will try this today.
>> On Wed, Jan 27, 2016 at 9:29 AM Mao Geng  wrote:
>>
>>> Hi Sathish,
>>>
>>> The docker image is normal, no AWS profile included.
>>>
>>> When the driver container runs with --net=host, the driver host's AWS
>>> profile will take effect so that the driver can access the protected s3
>>> files.
>>>
>>> Similarly,  Mesos slaves also run Spark executor docker container in
>>> --net=host mode, so that the AWS profile of Mesos slaves will take effect.
>>>
>>> Hope it helps,
>>> Mao
>>>
>>> On Jan 26, 2016, at 9:15 PM, Sathish Kumaran Vairavelu <
>>> vsathishkuma...@gmail.com> wrote:
>>>
>>> Hi Mao,
>>>
>>> I want to check on accessing the S3 from Spark docker in Mesos.  The EC2
>>> instance that I am using has the AWS profile/IAM included.  Should we build
>>> the docker image with any AWS profile settings or --net=host docker option
>>> takes care of it?
>>>
>>> Please help
>>>
>>>
>>> Thanks
>>>
>>> Sathish
>>>
>>> On Tue, Jan 26, 2016 at 9:04 PM Mao Geng  wrote:
>>>
 Thank you very much, Jerry!

 I changed to "--jars
 /opt/spark/lib/hadoop-aws-2.7.1.jar,/opt/spark/lib/aws-java-sdk-1.7.4.jar"
 then it worked like a charm!

 From Mesos task logs below, I saw Mesos executor downloaded the jars
 from the driver, which is a bit unnecessary (as the docker image already
 has them), but that's ok - I am happy seeing Spark + Mesos + Docker + S3
 worked together!

 Thanks,
 Mao

 16/01/27 02:54:45 INFO Executor: Using REPL class URI: 
 http://172.16.3.98:33771
 16/01/27 02:55:12 INFO CoarseGrainedExecutorBackend: Got assigned task 0
 16/01/27 02:55:12 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
 16/01/27 02:55:12 INFO Executor: Fetching 
 http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar with timestamp 
 1453863280432
 16/01/27 02:55:12 INFO Utils: Fetching 
 http://172.16.3.98:3850/jars/hadoop-aws-2.7.1.jar to 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp1518118694295619525.tmp
 16/01/27 02:55:12 INFO Utils: Copying 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/-19880839621453863280432_cache
  to /./hadoop-aws-2.7.1.jar
 16/01/27 02:55:12 INFO Executor: Adding file:/./hadoop-aws-2.7.1.jar to 
 class loader
 16/01/27 02:55:12 INFO Executor: Fetching 
 http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar with timestamp 
 1453863280472
 16/01/27 02:55:12 INFO Utils: Fetching 
 http://172.16.3.98:3850/jars/aws-java-sdk-1.7.4.jar to 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/fetchFileTemp8868621397726761921.tmp
 16/01/27 02:55:12 INFO Utils: Copying 
 /tmp/spark-7b8e1681-8a62-4f1d-9e11-fdf8062b1b08/8167072821453863280472_cache
  to /./aws-java-sdk-1.7.4.jar
 16/01/27 02:55:12 INFO Executor: Adding file:/./aws-java-sdk-1.7.4.jar to 
 class loader

 On Tue, Jan 26, 2016 at 5:40 PM, Jerry Lam 
 wrote:

> Hi Mao,
>
> Can you try --jars to include those jars?
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 26 Jan, 2016, at 7:02 pm, Mao Geng  wrote:
>
> Hi there,
>
> I am trying to run Spark on Mesos using a Docker image as executor, as
> mentioned
> http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-docker-support
> .
>
> I built a docker image using the following Dockerfile (which is based
> on
> https://github.com/apache/spark/blob/master/docker/spark-mesos/Dockerfile
> ):
>
> FROM mesosphere/mesos:0.25.0-0.2.70.ubuntu1404
>
> # Update the base ubuntu image with dependencies needed for Spark
> RUN apt-get update && \
> apt-get install -y python libnss3 openjdk-7-jre-headless curl
>
> RUN curl
> http://www.carfab.com/apachesoftware/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
> | tar -xzC /opt && \
> ln -s /opt/spark-1.6.0-bin-hadoop2.6 

Re: spark.kryo.classesToRegister

2016-01-28 Thread Jagrut Sharma
I have run into this issue (
https://issues.apache.org/jira/browse/SPARK-10251) with kryo on Spark
version 1.4.1.
Just something to be aware of when setting config to 'true'.

Thanks.
--
Jagrut


On Thu, Jan 28, 2016 at 6:32 AM, Jim Lohse 
wrote:

> You are only required to add classes to Kryo (compulsorily) if you use a
> specific setting:
>
> //require registration of all classes with 
> Kyro.set("spark.kryo.registrationRequired", "true")
>
> Here's an example of my setup, I think this is the best approach because
> it forces me to really think about what I am serializing:
>
> // for kyro serializer it wants to register all classes that need to be 
> serializedClass[] kryoClassArray = new Class[]{DropResult.class, 
> DropEvaluation.class, PrintHetSharing.class};
> SparkConf sparkConf = new SparkConf()
> .setAppName("MyAppName")
> .setMaster(spark://ipaddress:7077)
> // now for the Kryo stuff
> .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")//require registration of all 
> classes with Kyro.set("spark.kryo.registrationRequired", "true")// don't 
> forget to register ALL classes or will get 
> error.registerKryoClasses(kryoClassArray);
>
> On 01/27/2016 12:58 PM, Shixiong(Ryan) Zhu wrote:
>
> It depends. The default Kryo serializer cannot handle all cases. If you
> encounter any issue, you can follow the Kryo doc to set up custom
> serializer: https://github.com/EsotericSoftware/kryo/blob/master/README.md
>
> On Wed, Jan 27, 2016 at 3:13 AM, amit tewari 
> wrote:
>>
>> This is what I have added in my code:
>>
>>
>>
>> rdd.persist(StorageLevel.MEMORY_ONLY_SER())
>>
>> conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
>>
>>
>>
>> Do I compulsorily need to do anything via : spark.kryo.classesToRegister?
>>
>> Or the above code sufficient to achieve performance gain using Kryo
>> serialization?
>>
>>
>>
>> Thanks
>>
>> Amit
>>
>


Setting up data for columnsimilarity

2016-01-28 Thread rcollich
Hi all,

I need to be able to find the cosine similarity of a series of vectors (for
the sake of arguments let's say that every vector is a tweet). However, I'm
having an issue with how I can actually prepare my data to use the
Columnsimilarity function. I'm receiving these vectors in row format and I
can't find any form of a "transpose" function that works well. Has anyone
run into an issue similar to this?

Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-data-for-columnsimilarity-tp26098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Python UDFs

2016-01-28 Thread Stefan Panayotov
Thanks, Jacob.
 
But it seems that Python requires the RETURN Type to be specified.
And DenseVector is not a valid return type, or I do not know the correct type 
to put in.
Shall I try ArrayType?
Any ideas?



Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net

 
> Date: Wed, 27 Jan 2016 15:03:06 -0800
> Subject: Re: Python UDFs
> From: ja...@odersky.com
> To: spanayo...@msn.com
> CC: user@spark.apache.org
> 
> Have you checked:
> 
> - the mllib doc for python
> https://spark.apache.org/docs/1.6.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.DenseVector
> - the udf doc 
> https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.functions.udf
> 
> You should be fine in returning a DenseVector as the return type of
> the udf, as it provides access to a schema.
> 
> These are just directions to explore, I haven't used PySpark myself.
> 
> On Wed, Jan 27, 2016 at 10:38 AM, Stefan Panayotov  wrote:
> > Hi,
> >
> > I have defined a UDF in Scala like this:
> >
> > import org.apache.spark.mllib.linalg.Vector
> > import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
> > Statistics}
> > import org.apache.spark.mllib.linalg.DenseVector
> >
> > val determineVector = udf((a: Double, b: Double) => {
> > val data: Array[Double] = Array(a,b)
> > val dv = new DenseVector(data)
> > dv
> >   })
> >
> > How can I write the corresponding function in Pyhton/Pyspark?
> >
> > Thanks for your help
> >
> > Stefan Panayotov, PhD
> > Home: 610-355-0919
> > Cell: 610-517-5586
> > email: spanayo...@msn.com
> > spanayo...@outlook.com
> > spanayo...@comcast.net
> >
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Nirav Patel
Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
mode programmatically via creating a sparkConf and sparkContext object
manually. It was inspired from spark self-contained application example
here:

https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\



Only additional configuration we would provide would be all related to yarn
like executor instance, cores etc.

However after upgrading to spark 1.5.2 above application breaks on a line `
val sparkContext = new SparkContext(sparkConf)`

16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main

java.lang.NullPointerException

at
org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)

at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)

at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)

at
org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)

at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)

at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)

at org.apache.spark.SparkContext.(SparkContext.scala:593)


So is this approach still supposed to work? Or do I must use SparkLauncher
class with spark 1.5.2?


Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Nirav Patel
Thanks Saisai. I saw following in yarn container logs. I think that killed
sparkcontext.

16/01/28 17:38:29 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]*Unknown/unsupported param
List*(--properties-file,
/tmp/hadoop-xactly/nm-local-dir/usercache/nir/appcache/application_1453752281504_3427/container_1453752281504_3427_01_02/__spark_conf__/__spark_conf__.properties)

Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
Options:
  --jar JAR_PATH   Path to your application's JAR file
  --class CLASS_NAME   Name of your application's main class
  --primary-py-fileA main Python file
  --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
   place on the PYTHONPATH for Python apps.
  --args ARGS  Arguments to be passed to your application's main class.
   Multiple invocations are possible, each will be
passed in order.
  --num-executors NUMNumber of executors to start (Default: 2)
  --executor-cores NUM   Number of cores for the executors (Default: 1)
  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)



But if you are saying creating sparkcontext manually in your application
still works then I'll investigate more on my side. It just before I dig
more I wanted to know if it was still supported.

Nir

On Thu, Jan 28, 2016 at 7:47 PM, Saisai Shao  wrote:

> I think I met this problem before, this problem might be due to some race
> conditions in exit period. The way you mentioned is still valid, this
> problem only occurs when stopping the application.
>
> Thanks
> Saisai
>
> On Fri, Jan 29, 2016 at 10:22 AM, Nirav Patel 
> wrote:
>
>> Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
>> mode programmatically via creating a sparkConf and sparkContext object
>> manually. It was inspired from spark self-contained application example
>> here:
>>
>>
>> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>>
>>
>>
>> Only additional configuration we would provide would be all related to
>> yarn like executor instance, cores etc.
>>
>> However after upgrading to spark 1.5.2 above application breaks on a line
>> `val sparkContext = new SparkContext(sparkConf)`
>>
>> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>>
>> java.lang.NullPointerException
>>
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>>
>> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>>
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>>
>> at
>> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>>
>> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>>
>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>>
>> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>>
>>
>> So is this approach still supposed to work? Or do I must use
>> SparkLauncher class with spark 1.5.2?
>>
>>
>> Thanks
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Persisting of DataFrames in transformation workflows

2016-01-28 Thread Gireesh Puthumana
Hi All,

I am trying to run a series of transformation over 3 DataFrames. After each
transformation, I want to persist DF and save it to text file. The steps I
am doing is as follows.

*Step0:*
Create DF1
Create DF2
Create DF3
Create DF4
(no persist no save yet)

*Step1:*
Create RESULT-DF1 by joining DF1 and DF2
Persist it to disk and memory
Save it to text file

*Step2:*
Create RESULT-DF2 by joining RESULT-DF1 and DF3
Persist it to disk and memory
Save it to text file

*Step3:*
Create RESULT-DF3 by joining RESULT-DF2 and DF4
Persist it to disk and memory
Save it to text file

*Observation:*
Number of tasks created at Step1 is 601
Number of tasks created at Step2 is 1004 (Didn't skip anything)
Number of tasks created at Step3 is 1400 (Skipped 400 tasks)

As different approach, I broke above steps into three different runs. ie;

   - Start, Load DF1 and DF2, Do Step1, Save RESULT-DF1 & exit
   - Start, Load DF3, Load RESULT-DF1 from file, do Step2, save RESULT-DF2
   & exit
   - Start, Load DF4, Load RESULT-DF2 from file, do Step3, save RESULT-DF3
   & exit

Later approach runs faster.

*My question is:*

   1. Am missing something on the persisting side in first approach?
   2. Why Step2 run didn't just use result from Step1 without redoing all
   it's tasks even after persisting (with only 601 tasks instead of 1004)?
   3. What are some good reads about best practices, when implementing such
   series of transformation workflows?

Thanks in advance,
Gireesh


Re: Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Saisai Shao
Sorry I didn't notice this mail, seems like a wrong cmdline problem, please
ignore my previous comment.

On Fri, Jan 29, 2016 at 11:58 AM, Nirav Patel  wrote:

> Thanks Saisai. I saw following in yarn container logs. I think that killed
> sparkcontext.
>
> 16/01/28 17:38:29 INFO yarn.ApplicationMaster: Registered signal handlers for 
> [TERM, HUP, INT]*Unknown/unsupported param List*(--properties-file, 
> /tmp/hadoop-xactly/nm-local-dir/usercache/nir/appcache/application_1453752281504_3427/container_1453752281504_3427_01_02/__spark_conf__/__spark_conf__.properties)
>
> Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
> Options:
>   --jar JAR_PATH   Path to your application's JAR file
>   --class CLASS_NAME   Name of your application's main class
>   --primary-py-fileA main Python file
>   --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
>place on the PYTHONPATH for Python apps.
>   --args ARGS  Arguments to be passed to your application's main 
> class.
>Multiple invocations are possible, each will be passed 
> in order.
>   --num-executors NUMNumber of executors to start (Default: 2)
>   --executor-cores NUM   Number of cores for the executors (Default: 1)
>   --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
>
>
>
> But if you are saying creating sparkcontext manually in your application
> still works then I'll investigate more on my side. It just before I dig
> more I wanted to know if it was still supported.
>
> Nir
>
> On Thu, Jan 28, 2016 at 7:47 PM, Saisai Shao 
> wrote:
>
>> I think I met this problem before, this problem might be due to some race
>> conditions in exit period. The way you mentioned is still valid, this
>> problem only occurs when stopping the application.
>>
>> Thanks
>> Saisai
>>
>> On Fri, Jan 29, 2016 at 10:22 AM, Nirav Patel 
>> wrote:
>>
>>> Hi, we were using spark 1.3.1 and launching our spark jobs on
>>> yarn-client mode programmatically via creating a sparkConf and sparkContext
>>> object manually. It was inspired from spark self-contained application
>>> example here:
>>>
>>>
>>> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>>>
>>>
>>>
>>> Only additional configuration we would provide would be all related to
>>> yarn like executor instance, cores etc.
>>>
>>> However after upgrading to spark 1.5.2 above application breaks on a
>>> line `val sparkContext = new SparkContext(sparkConf)`
>>>
>>> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>>>
>>> java.lang.NullPointerException
>>>
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>>>
>>> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>>>
>>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>>>
>>> at
>>> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>>>
>>> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>>>
>>> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>>>
>>> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>>>
>>>
>>> So is this approach still supposed to work? Or do I must use
>>> SparkLauncher class with spark 1.5.2?
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>>   [image: LinkedIn]
>>>   [image: Twitter]
>>>   [image: Facebook]
>>>   [image: YouTube]
>>> 
>>
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 
>


How to filter the isolated vertexes in Graphx

2016-01-28 Thread Zhang, Jingyu
I try to filter vertexes that did not have any connection links with
others. How to filter those isolated vertexes in Graphx?

Thanks,

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: looking for an easy way to count number of rows in JavaDStream

2016-01-28 Thread Andy Davidson
Forgot to mention. The reason I want the count is so that I can reparation
my data so that when I save it to disk each file has at 100 rows instead of
lots of smaller files

Kind regards

Andy

From:  Andrew Davidson 
Date:  Thursday, January 28, 2016 at 6:41 PM
To:  "user @spark" 
Subject:  looking for an easy way to count number of rows in JavaDStream

> There must be any easy way to count the number of rows in JavaDStream.
> 
> 
> JavaDStream words;
> 
> JavaDStream hardToUse = words();
> 
> 
> 
> 
> 
> JavaDStream does not seem to have a collect().
> 
> 
> 
> The following works but is very clumsy
> 
> 
> 
> Any suggestions would be greatly appreciated
> 
> 
> 
> Andy
> 
> 
> 
> public class JavaDStreamCount implements Serializable {
> 
> private static final long serialVersionUID = -3600586183332429887L;
> 
> public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
> 
> 
> 
> /**
> 
>  * TODO in 1.6 should be able to use a lambda function
> 
>  * @see https://issues.apache.org/jira/browse/SPARK-4557
> 
>  * @param total
> 
>  * @param javaDStream
> 
>  * @return
> 
>  */
> 
> @Deprecated
> 
> public Double hack(Accumulator total, JavaDStream javaDStream)
> {
> 
> Count c = new Count(total);
> 
> javaDStream.foreachRDD(c);
> 
> return c.getTotal().value();
> 
> }
> 
> 
> 
> class Count implements Function {
> 
> private static final long serialVersionUID = -5239727633710162488L;
> 
> private Accumulator total;
> 
> 
> 
> public Count(Accumulator total) {
> 
> this.total = total;
> 
> }
> 
> 
> 
> @Override
> 
> public java.lang.Void call(JavaRDD rdd) throws Exception {
> 
> List data = rdd.collect();
> 
> int dataSize = data.size();
> 
> logger.info("Accumulator name:{} data.size:{}", total.name(),
> dataSize);
> 
> long num = rdd.count();
> 
> logger.info("num:{}", num);
> 
> total.add(new Double(num));
> 
> return null;
> 
> }
> 
> 
> 
> public Accumulator getTotal() {
> 
> return total;
> 
> }
> 
> 
> 
> }
> 
> }
> 
> 
> 
> 




Spark 1.5.2 - Programmatically launching spark on yarn-client mode

2016-01-28 Thread Nirav Patel
Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
mode programmatically via creating a sparkConf and sparkContext object
manually. It was inspired from spark self-contained application example
here:

https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\

Only additional configuration we would provide would be all related to yarn
like executor instance, cores, memory, extraJavaOptions etc.

However after upgrading to spark 1.5.2 above application breaks on a line
`val sparkContext = new SparkContext(sparkConf)`

16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main

java.lang.NullPointerException

at
org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)

at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)

at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)

at
org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)

at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)

at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)

at org.apache.spark.SparkContext.(SparkContext.scala:593)


*In yarn container logs I see following:*

16/01/28 17:38:29 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]*Unknown/unsupported param
List*(--properties-file,
/tmp/hadoop-xactly/nm-local-dir/usercache/xactly/appcache/application_1453752281504_3427/container_1453752281504_3427_01_02/__spark_conf__/__spark_conf__.properties)

Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
Options:
  --jar JAR_PATH   Path to your application's JAR file
  --class CLASS_NAME   Name of your application's main class
  --primary-py-fileA main Python file
  --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
   place on the PYTHONPATH for Python apps.
  --args ARGS  Arguments to be passed to your application's main class.
   Multiple invocations are possible, each will be
passed in order.
  --num-executors NUMNumber of executors to start (Default: 2)
  --executor-cores NUM   Number of cores for the executors (Default: 1)
  --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)



So is this approach still supposed to work? Or do I must use SparkLauncher
class with spark 1.5.2?

Thanks

Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Programmatically launching spark on yarn-client mode no longer works in spark 1.5.2

2016-01-28 Thread Saisai Shao
I think I met this problem before, this problem might be due to some race
conditions in exit period. The way you mentioned is still valid, this
problem only occurs when stopping the application.

Thanks
Saisai

On Fri, Jan 29, 2016 at 10:22 AM, Nirav Patel  wrote:

> Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
> mode programmatically via creating a sparkConf and sparkContext object
> manually. It was inspired from spark self-contained application example
> here:
>
>
> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>
>
>
> Only additional configuration we would provide would be all related to
> yarn like executor instance, cores etc.
>
> However after upgrading to spark 1.5.2 above application breaks on a line `
> val sparkContext = new SparkContext(sparkConf)`
>
> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>
> java.lang.NullPointerException
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>
> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>
> at
> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>
>
> So is this approach still supposed to work? Or do I must use SparkLauncher
> class with spark 1.5.2?
>
>
> Thanks
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Getting Exceptions/WARN during random runs for same dataset

2016-01-28 Thread Ted Yu
Did the UnsupportedOperationException's happen from the executors on all the
nodes or only one node ?

Thanks

On Thu, Jan 28, 2016 at 5:13 PM, Khusro Siddiqui  wrote:

> Hi Everyone,
>
> Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
> 1.4.1 and scala 2.10.5.
>
> I am using Dataframes to query Cassandra, do processing and store the
> result back into Cassandra. The job is being submitted using spark-submit
> on a cluster of 3 nodes. While doing so I get three WARN messages:
>
> WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager: Lost
> task 99.0 in stage 2.0 (TID 107, 10.2.1.82): java.io.InvalidClassException:
> org.apache.spark.sql.types.TimestampType$; unable to create instance
>
> Caused by: java.lang.reflect.InvocationTargetException
>
> Caused by: java.lang.UnsupportedOperationException: tail of empty list
>
>
> For example, if I am running the same job, for the same input set of data,
> say 20 times,
>
> - 11 times it will run successfully without any WARN messages
>
> - 4 times it will run successfully with the above messages
>
> - 6 times it will run successfully by randomly giving one or two of
> the exceptions above
>
>
> In all the 20 runs, the output data is coming as expected and there is no
> error in that. My concern is, why is it not giving these messages every
> time I do a spark-submit but only at times. Also, the stack trace does not
> point to any specific point in my line of code. Full stack trace is as
> follows. Please let me know if you need any other information
>
>
> WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager: Lost
> task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
> unable to create instance
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at 

looking for an easy way to count number of rows in JavaDStream

2016-01-28 Thread Andy Davidson
There must be any easy way to count the number of rows in JavaDStream.


JavaDStream words;

JavaDStream hardToUse = words();





JavaDStream does not seem to have a collect().



The following works but is very clumsy



Any suggestions would be greatly appreciated



Andy



public class JavaDStreamCount implements Serializable {

private static final long serialVersionUID = -3600586183332429887L;

public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);



/**

 * TODO in 1.6 should be able to use a lambda function

 * @see https://issues.apache.org/jira/browse/SPARK-4557

 * @param total

 * @param javaDStream

 * @return

 */

@Deprecated

public Double hack(Accumulator total, JavaDStream
javaDStream) {

Count c = new Count(total);

javaDStream.foreachRDD(c);

return c.getTotal().value();

}



class Count implements Function {

private static final long serialVersionUID = -5239727633710162488L;

private Accumulator total;



public Count(Accumulator total) {

this.total = total;

}



@Override

public java.lang.Void call(JavaRDD rdd) throws Exception {

List data = rdd.collect();

int dataSize = data.size();

logger.info("Accumulator name:{} data.size:{}", total.name(),
dataSize);

long num = rdd.count();

logger.info("num:{}", num);

total.add(new Double(num));

return null;

}



public Accumulator getTotal() {

return total;

}



}

}








Re: TTransportException when using Spark 1.6.0 on top of Tachyon 0.8.2

2016-01-28 Thread Calvin Jia
Hi,

Thanks for the detailed information. How large is the dataset you are 
running against? Also did you change any Tachyon configurations?

Thanks,
Calvin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-28 Thread Andrew Zurn
Hey Daniel,

Thanks for the response.

After playing around for a bit, it looks like it's probably the something
similar to the first situation you mentioned, with the Parquet format
causing issues. Both programmatically created dataset and a dataset pulled
off the internet (rather than out of S3 and put into HDFS/Hive) acted with
DataFrames as one would expect (printed out everything, grouped properly,
etc.)

It looks like there is more than likely an outstanding bug that causes
issues with data coming from S3 and is converted in the parquet format
(found an article here highlighting it was around in 1.4, and I guess it
wouldn't be out of the realm of things for it still to exist. Link to
article:
https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/

Hopefully a little more stability will come out with the upcoming Spark 1.6
release on EMR (I think that is happening sometime soon).

Thanks again for the advice on where to dig further into. Much appreciated.

Andrew

On Tue, Jan 26, 2016 at 9:18 AM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Have you tried setting spark.emr.dropCharacters to a lower value? (It
> defaults to 8.)
>
> :) Just joking, sorry! Fantastic bug.
>
> What data source do you have for this DataFrame? I could imagine for
> example that it's a Parquet file and on EMR you are running with two wrong
> version of the Parquet library and it messes up strings. It should be easy
> enough to try a different data format. You could also try what happens if
> you just create the DataFrame programmatically, e.g.
> sc.parallelize(Seq("asdfasdfasdf")).toDF.
>
> To understand better at which point the characters are lost you could try
> grouping by a string attribute. I see "education" ends up either as ""
> (empty string) or "y" in the printed output. But are the characters already
> lost when you try grouping by the attribute? Will there be a single ""
> category, or will you have separate categories for "primary" and "tertiary"?
>
> I think the correct output through the RDD suggests that the issue happens
> at the very end. So it will probably happen also with different data
> sources, and grouping will create separate groups for "primary" and
> "tertiary" even though they are printed as the same string at the end. You
> should also check the data from "take(10)" to rule out any issues with
> printing. You could try the same "groupBy" trick after "take(10)". Or you
> could print the lengths of the strings.
>
> Good luck!
>
> On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:
>
>> Sorry for the bump, but wondering if anyone else has seen this before.
>> We're
>> hoping to either resolve this soon, or move on with further steps to move
>> this into an issue.
>>
>> Thanks in advance,
>>
>> Andrew Zurn
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark 1.5.2 - Programmatically launching spark on yarn-client mode

2016-01-28 Thread Ted Yu
Looks like '--properties-file' is no longer supported.

Was it possible that Spark 1.3.1 artifact / dependency leaked into your app
?

Cheers

On Thu, Jan 28, 2016 at 7:36 PM, Nirav Patel  wrote:

> Hi, we were using spark 1.3.1 and launching our spark jobs on yarn-client
> mode programmatically via creating a sparkConf and sparkContext object
> manually. It was inspired from spark self-contained application example
> here:
>
>
> https://spark.apache.org/docs/1.5.2/quick-start.html#self-contained-applications\
>
> Only additional configuration we would provide would be all related to
> yarn like executor instance, cores, memory, extraJavaOptions etc.
>
> However after upgrading to spark 1.5.2 above application breaks on a line
> `val sparkContext = new SparkContext(sparkConf)`
>
> 16/01/28 17:38:35 ERROR util.Utils: Uncaught exception in thread main
>
> java.lang.NullPointerException
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService.close(NettyBlockTransferService.scala:152)
>
> at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>
> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>
> at
> org.apache.spark.SparkContext$$anonfun$stop$12.apply$mcV$sp(SparkContext.scala:1749)
>
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
>
> at org.apache.spark.SparkContext.stop(SparkContext.scala:1748)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:593)
>
>
> *In yarn container logs I see following:*
>
> 16/01/28 17:38:29 INFO yarn.ApplicationMaster: Registered signal handlers for 
> [TERM, HUP, INT]*Unknown/unsupported param List*(--properties-file, 
> /tmp/hadoop-xactly/nm-local-dir/usercache/xactly/appcache/application_1453752281504_3427/container_1453752281504_3427_01_02/__spark_conf__/__spark_conf__.properties)
>
> Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
> Options:
>   --jar JAR_PATH   Path to your application's JAR file
>   --class CLASS_NAME   Name of your application's main class
>   --primary-py-fileA main Python file
>   --py-files PY_FILES  Comma-separated list of .zip, .egg, or .py files to
>place on the PYTHONPATH for Python apps.
>   --args ARGS  Arguments to be passed to your application's main 
> class.
>Multiple invocations are possible, each will be passed 
> in order.
>   --num-executors NUMNumber of executors to start (Default: 2)
>   --executor-cores NUM   Number of cores for the executors (Default: 1)
>   --executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G)
>
>
>
> So is this approach still supposed to work? Or do I must use SparkLauncher
> class with spark 1.5.2?
>
> Thanks
>
> Nirav
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Spark streaming flow control and back pressure

2016-01-28 Thread Iulian Dragoș
Calling `store` should get you there. What version of Spark are you using?
Can you share your code?

iulian

On Thu, Jan 28, 2016 at 2:28 AM, Lin Zhao  wrote:

> I have an actor receiver that reads data and calls "store()" to save data
> to spark. I was hoping spark.streaming.receiver.maxRate and
> spark.streaming.backpressure would help me block the method when needed to
> avoid overflowing the pipeline. But it doesn't. My actor pumps millions of
> lines to spark when backpressure and the rate limit is in effect. Whereas
> these data is slow flowing into the input blocks, the data created sits
> around and creates memory problem.
>
> Is there guideline how to handle this? What's the best way for my actor to
> know it should slow down so it doesn't keep creating millions of messages?
> Blocking store() call seems aptable.
>
> Thanks, Lin
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re:Hive on Spark knobs

2016-01-28 Thread Todd
Did you run hive on spark with spark 1.5 and hive 1.1?
I think hive on spark doesn't support spark 1.5. There are compatibility issues.




At 2016-01-28 01:51:43, "Ruslan Dautkhanov"  wrote:

https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started


There are quite a lot of knobs to tune for Hive on Spark.


Above page recommends following settings:



mapreduce.input.fileinputformat.split.maxsize=75000
hive.vectorized.execution.enabled=true
hive.cbo.enable=true
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.orc.splits.include.file.footer=false
hive.merge.mapfiles=true
hive.merge.sparkfiles=false
hive.merge.smallfiles.avgsize=1600
hive.merge.size.per.task=25600
hive.merge.orcfile.stripe.level=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=894435328
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.vectorized.execution.reduce.enabled=false
hive.vectorized.groupby.checkinterval=4096
hive.vectorized.groupby.flush.percent=0.1
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=1
hive.exec.orc.default.stripe.size=67108864
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.fetch.task.aggr=false
mapreduce.input.fileinputformat.list-status.num-threads=5
spark.kryo.referenceTracking=false
spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
Did it work for everybody? It may take days if not weeks to try to tune all of 
these parameters for a specific job. 

We're on Spark 1.5 / Hive 1.1.


ps. We have a job that can't get working well as a Hive job, so thought to use 
Hive on Spark instead. (a 3-table full outer joins with group by + 
collect_list). Spark should handle this much better.


Ruslan




Re: spark.kryo.classesToRegister

2016-01-28 Thread Jim Lohse
You are only required to add classes to Kryo (compulsorily) if you use a 
specific setting:


//require registration of all classes with Kyro 
.set("spark.kryo.registrationRequired","true")

Here's an example of my setup, I think this is the best approach because 
it forces me to really think about what I am serializing:


// for kyro serializer it wants to register all classes that need to be 
serialized Class[] kryoClassArray = new Class[]{DropResult.class, 
DropEvaluation.class, PrintHetSharing.class}; SparkConf sparkConf = new 
SparkConf() .setAppName("MyAppName") .setMaster(spark://ipaddress:7077) 
// now for the Kryo stuff .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer") //require registration of 
all classes with Kyro .set("spark.kryo.registrationRequired", "true") // 
don't forget to register ALL classes or will get error 
.registerKryoClasses(kryoClassArray);





On 01/27/2016 12:58 PM, Shixiong(Ryan) Zhu wrote:
It depends. The default Kryo serializer cannot handle all cases. If 
you encounter any issue, you can follow the Kryo doc to set up custom 
serializer: 
https://github.com/EsotericSoftware/kryo/blob/master/README.md
On Wed, Jan 27, 2016 at 3:13 AM, amit tewari > wrote:


This is what I have added in my code:

rdd.persist(StorageLevel.MEMORY_ONLY_SER())

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");

Do I compulsorily need to do anything via
: spark.kryo.classesToRegister?

Or the above code sufficient to achieve performance gain using
Kryo serialization?

Thanks

Amit



Re: Tips for Spark's Random Forest slow performance

2016-01-28 Thread Alexander Ratnikov
Coming back to this I believe I found some reasons.
Basically, the main logic sits inside ProbabilisticClassificationModel.
It has a transform method which takes a DataFrame (the vector to classify)
and appends to it some UDFs which actually do the prediction.

The thing is that this DataFrame execution is not local.
This makes it different to the old mllib API which does't send anything to
the executors and just evaluates the model in memory of the driver which
makes it fast.
With the new ml API it basically needs to serialize/deserialize
RandomForestClassificationModel on each prediction which makes it notably
slow even for my small model (around 100 mb).
There's something in JIRA regarding the topic:
https://issues.apache.org/jira/browse/SPARK-10014

I believe there should be a way to cache the model and avoid rebroadcasting
but I didn't manage to find it.
Could you advice me how to do it ?
Btw, I made a local fix and I'm happy to provide a pull request but first
I'd like to make sure I'm not missing the right way.


2015-12-26 2:03 GMT+01:00 Chris Fregly :

> ah, so with that much serialization happening, you might actually need
> *less* workers!  :)
>
> in the next couple releases of Spark ML should, we should see better
> scoring/predicting functionality using a single node for exactly this
> reason.
>
> to get there, we need model.save/load support (PMML?), optimized
> single-node linear algebra support, and a few other goodies.
>
> useNodeIdCache only affects training.
>
> btw, are you checkpointing per this
> ?
>  (code snippet from DecisionTreeParams copy/pasted below for convenience)
>
> /**
> * Specifies how often to checkpoint the cached node IDs.
> * E.g. 10 means that the cache will get checkpointed every 10 iterations.
> * This is only used if cacheNodeIds is true and if the checkpoint
> directory is set in
> * [[org.apache.spark.SparkContext]].
> * Must be >= 1.
> * (default = 10)
> * @group expertSetParam
> */
> def setCheckpointInterval(value: Int): this.type =
> set(checkpointInterval, value)
> i'm not actually sure how this will affect training performance with the
> new ml.RandomForest impl, but i'm curious to hear what you find.
>
>
> On Fri, Dec 25, 2015 at 6:03 PM, Alexander Ratnikov <
> ratnikov.alexan...@gmail.com> wrote:
>
>> Definitely the biggest difference is the maxDepth of the trees. With
>> values smaller or equal to 5 the time goes into milliseconds.
>> The amount of trees affects the performance but not that much.
>> I tried to profile the app and I see decent time spent in serialization.
>> I'm wondering if Spark isn't somehow caching model on workers during
>> classification ?
>>
>> useNodeIdCache is ON but docs aren't clear if Spark is using it only
>> on training.
>> Also, I must say we didn't have this problem in the old mllib API so
>> it might be something in the new ml that I'm missing.
>> I will dig deeper into the problem after holidays.
>>
>> 2015-12-25 16:26 GMT+01:00 Chris Fregly :
>> > so it looks like you're increasing num trees by 5x and you're seeing an
>> 8x
>> > increase in runtime, correct?
>> >
>> > did you analyze the Spark cluster resources to monitor the memory usage,
>> > spillage, disk I/O, etc?
>> >
>> > you may need more Workers.
>> >
>> > On Tue, Dec 22, 2015 at 8:57 AM, Alexander Ratnikov
>> >  wrote:
>> >>
>> >> Hi All,
>> >>
>> >> It would be good to get some tips on tuning Apache Spark for Random
>> >> Forest classification.
>> >> Currently, we have a model that looks like:
>> >>
>> >> featureSubsetStrategy all
>> >> impurity gini
>> >> maxBins 32
>> >> maxDepth 11
>> >> numberOfClasses 2
>> >> numberOfTrees 100
>> >>
>> >> We are running Spark 1.5.1 as a standalone cluster.
>> >>
>> >> 1 Master and 2 Worker nodes.
>> >> The amount of RAM is 32GB on each node with 4 Cores.
>> >> The classification takes 440ms.
>> >>
>> >> When we increase the number of trees to 500, it takes 8 sec already.
>> >> We tried to reduce the depth but then error rate is higher. We have
>> >> around 246 attributes.
>> >>
>> >> Probably we are doing something wrong. Any ideas how we could improve
>> >> the performance ?
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Tips-for-Spark-s-Random-Forest-slow-performance-tp25766.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>
>> >> -
>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >>
>> >
>> >
>> >
>> > --
>> >
>> > Chris Fregly
>> > Principal Data Solutions Engineer
>> > IBM Spark Technology Center, San Francisco, CA
>> > http://spark.tc | http://advancedspark.com
>>
>

Re: Having issue with Spark SQL JDBC on hive table !!!

2016-01-28 Thread @Sanjiv Singh
It working now ...

I checked at Spark worker UI , executor startup failing with below error ,
JVM initialization failing because of wrong -Xms :

Invalid initial heap size: -Xms0MError: Could not create the Java
Virtual Machine.Error: A fatal exception has occurred. Program will
exit.

Thrift server is not picking executor memory from *spark-env.sh*​ , then I
added in thrift server startup script explicitly.

*./sbin/start-thriftserver.sh*

exec "$FWDIR"/sbin/spark-daemon.sh spark-submit $CLASS 1
--executor-memory 512M "$@"

With this , Executor start getting valid memory and JDBC queries are
getting results.

*conf/spark-env.sh*​ (executor memory configurations not picked by
thrift-server)

export SPARK_JAVA_OPTS="-Dspark.executor.memory=512M"
export SPARK_EXECUTOR_MEMORY=512M


Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Thu, Jan 28, 2016 at 10:57 PM, @Sanjiv Singh 
wrote:

> Adding to it
>
> job status at UI :
>
> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
> ReadShuffle Write
> 1 select ename from employeetest(kill
> )collect
> at SparkPlan.scala:84
> +details
>
> 2016/01/29 04:20:06 3.0 min
> 0/2
>
> Getting below exception on Spark UI :
>
> org.apache.spark.rdd.RDD.collect(RDD.scala:813)
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
> org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
> org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.run(Shim13.scala:178)
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:231)
> org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:218)
> org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:233)
> org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:344)
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
> org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
> org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
> org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
> org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:55)
> org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Thu, Jan 28, 2016 at 9:57 PM, @Sanjiv Singh 
> wrote:
>
>> Any help on this.
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>> On Wed, Jan 27, 2016 at 10:25 PM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi Ted ,
>>> Its typo.
>>>
>>>
>>> Regards
>>> Sanjiv Singh
>>> Mob :  +091 9990-447-339
>>>
>>> On Wed, Jan 27, 2016 at 9:13 PM, Ted Yu  wrote:
>>>
 In the last snippet, temptable is shown by 'show tables' command.
 Yet you queried tampTable.

 I believe this just was typo :-)

 On Wed, Jan 27, 2016 at 7:07 AM, @Sanjiv Singh 
 wrote:

> Hi All,
>
> I have configured Spark to query on hive table.
>
> Run the Thrift JDBC/ODBC server using below command :
>
> *cd $SPARK_HOME*
> *./sbin/start-thriftserver.sh --master spark://myhost:7077 --hiveconf
> hive.server2.thrift.bind.host=myhost --hiveconf
> hive.server2.thrift.port=*
>
> and also able to connect through beeline
>
> *beeline>* !connect jdbc:hive2://192.168.145.20:
> Enter username for jdbc:hive2://192.168.145.20:: root
> Enter password for jdbc:hive2://192.168.145.20:: impetus
> *beeline > *
>
> It is not giving query result on hive table through Spark JDBC, but it
> is working with spark HiveSQLContext. See complete scenario explain below.
>
> Help me understand the issue why Spark SQL JDBC is not giving result ?
>
> Below are version details.
>
> *Hive Version  : 1.2.1*
> *Hadoop Version :  2.6.0*
> *Spark version:  1.3.1*
>
> Let me know if need other details.
>
>
> *Created Hive Table , insert some records and query it :*
>
> *beeline> !connect jdbc:hive2://myhost:1*
> Enter username for jdbc:hive2://myhost:1: root
> Enter password for jdbc:hive2://myhost:1: **
> *beeline> create table tampTable(id int ,name string ) clustered by
> (id) into 2 buckets stored as orc TBLPROPERTIES('transactional'='true');*
> *beeline> insert into table tampTable values
> 

Spark streaming and ThreadLocal

2016-01-28 Thread N B
Hello,

Does anyone know if there are any potential pitfalls associated with using
ThreadLocal variables in a Spark streaming application? One things I have
seen mentioned in the context of app servers that use thread pools is that
ThreadLocals can leak memory. Could this happen in Spark streaming also?

Thanks
Nikunj


Visualization of KMeans cluster in Spark

2016-01-28 Thread Yogesh Vyas
Hi,

Is there any way to visualizing the KMeans clusters in spark?
Can we connect Plotly with Apache Spark in Java?

Thanks,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org