1.4.1 in production

2015-07-20 Thread igor.berman
Hi,
do somebody already uses version 1.4.1 in production? any problems?
thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-4-1-in-production-tp23909.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: use S3-Compatible Storage with spark

2015-07-20 Thread Akhil Das
Not in the uri, but in the hadoop configuration you can specify it.

property
  namefs.s3a.endpoint/name
  descriptionAWS S3 endpoint to connect to. An up-to-date list is
provided in the AWS Documentation: regions and endpoints. Without this
property, the standard region (s3.amazonaws.com) is assumed.
  /description
/property


Thanks
Best Regards

On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com
wrote:

 I want to use pithos, were do I can specify that endpoint, is it
 possible in the url ?

 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
  Could you name the Storage service that you are using? Most of them
 provides
  a S3 like RestAPI endpoint for you to hit.
 
  Thanks
  Best Regards
 
  On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com
  wrote:
 
  Hi,
 
  I wonder how to use S3 compatible Storage in Spark ?
  If I'm using s3n:// url schema, the it will point to amazon, is there
  a way I can specify the host somewhere ?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: how to start reading the spark source code?

2015-07-20 Thread Yang
ok got some headstart:

pull the git source to 14719b93ff4ea7c3234a9389621be3c97fa278b9 (first
release so that I could at least build it)

then build it according to README.md,
then get eclipse setup , with scala-ide
then create new scala project, set the project directory to be
SCALA_SOURCE_HOME/core  instead of the default

in eclipse remove the test from source,

copy all the jars from SCALA_SOURCE_HOME/lib_managed into a separate dir,
then in eclipse add all these as external jars.

set ur scala project run time to be 2.10.5 (the one coming with spark seems
to be 2.10.4 , eclipse default is 2.9 something)
there would be 2 compile errors , one due to Tuple() , change it to Tuple2,
another one is currentThread, change it to Thread.currentThread()

then it will build fine

I pasted the hello-world from docs , since the getting started doc is for
latest version, I had to make some minor changes:



package spark



import spark.SparkContext
import spark.SparkContext._

object Tryout {
  def main(args: Array[String]) {
val logFile = ../README.md // Should be some file on your system
val sc = new SparkContext(local, tryout, .,
List(System.getenv(SPARK_EXAMPLES_JAR)))
val logData = sc.textFile(logFile, 2).cache()

//val logData = scala.io.Source.fromFile(args(0)).getLines().toArray

val numAs = logData.filter(line = line.contains(a)).count()
val numBs = logData.filter(line = line.contains(b)).count()
println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))
  }
}




then I debug through this and it became fairly clear

On Sun, Jul 19, 2015 at 10:13 PM, Yang tedd...@gmail.com wrote:

 thanks, my point is that earlier versions are normally much simpler so
 it's easier to follow. and the basic structure should at least bare great
 similarity with latest version

 On Sun, Jul 19, 2015 at 9:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 e5c4cd8a5e188592f8786a265 was from 2011.

 Not sure why you started with such an early commit.

 Spark project has evolved quite fast.

 I suggest you clone Spark project from github.com/apache/spark/ and
 start with core/src/main/scala/org/apache/spark/rdd/RDD.scala

 Cheers

 On Sun, Jul 19, 2015 at 7:44 PM, Yang tedd...@gmail.com wrote:

 I'm trying to understand how spark works under the hood, so I tried to
 read the source code.

 as I normally do, I downloaded the git source code, reverted to the very
 first version ( actually e5c4cd8a5e188592f8786a265c0cd073c69ac886 since the
 first version even lacked the definition of RDD.scala)

 but the code looks too simple and I can't find where the magic
 happens, i.e. a transformation /computation is scheduled on  a machine,
 bytes stored etc.

 it would be great if someone could show me a path in which the different
 source files are involved, so that I could read each of them in turn.

 thanks!
 yang






Re: how to start reading the spark source code?

2015-07-20 Thread Yang
also one peculiar difference vs Hadoop MR is that the partition/split/part
of RDD is as much an operation as it's data, since an RDD is associated
with a transformation, and a lineage of all its ancestor RDDs. so when the
partition is transferred to a new executor/worker (potentially on another
box), the operation definition / code is transferred together with the data
to that new executor, involving serialization and deserialization. this is
something new, if hadoop MR does this, it needs to transfer the jar file to
worker, but spark being scala, it was easy to transfer the entire operation
(Task[] ) through serialization.

On Mon, Jul 20, 2015 at 12:38 AM, Yang tedd...@gmail.com wrote:

 ok got some headstart:

 pull the git source to 14719b93ff4ea7c3234a9389621be3c97fa278b9 (first
 release so that I could at least build it)

 then build it according to README.md,
 then get eclipse setup , with scala-ide
 then create new scala project, set the project directory to be
 SCALA_SOURCE_HOME/core  instead of the default

 in eclipse remove the test from source,

 copy all the jars from SCALA_SOURCE_HOME/lib_managed into a separate dir,
 then in eclipse add all these as external jars.

 set ur scala project run time to be 2.10.5 (the one coming with spark
 seems to be 2.10.4 , eclipse default is 2.9 something)
 there would be 2 compile errors , one due to Tuple() , change it to
 Tuple2, another one is currentThread, change it to Thread.currentThread()

 then it will build fine

 I pasted the hello-world from docs , since the getting started doc is
 for latest version, I had to make some minor changes:



 package spark



 import spark.SparkContext
 import spark.SparkContext._

 object Tryout {
   def main(args: Array[String]) {
 val logFile = ../README.md // Should be some file on your system
 val sc = new SparkContext(local, tryout, .,
 List(System.getenv(SPARK_EXAMPLES_JAR)))
 val logData = sc.textFile(logFile, 2).cache()

 //val logData = scala.io.Source.fromFile(args(0)).getLines().toArray

 val numAs = logData.filter(line = line.contains(a)).count()
 val numBs = logData.filter(line = line.contains(b)).count()
 println(Lines with a: %s, Lines with b: %s.format(numAs, numBs))
   }
 }




 then I debug through this and it became fairly clear

 On Sun, Jul 19, 2015 at 10:13 PM, Yang tedd...@gmail.com wrote:

 thanks, my point is that earlier versions are normally much simpler so
 it's easier to follow. and the basic structure should at least bare great
 similarity with latest version

 On Sun, Jul 19, 2015 at 9:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 e5c4cd8a5e188592f8786a265 was from 2011.

 Not sure why you started with such an early commit.

 Spark project has evolved quite fast.

 I suggest you clone Spark project from github.com/apache/spark/ and
 start with core/src/main/scala/org/apache/spark/rdd/RDD.scala

 Cheers

 On Sun, Jul 19, 2015 at 7:44 PM, Yang tedd...@gmail.com wrote:

 I'm trying to understand how spark works under the hood, so I tried to
 read the source code.

 as I normally do, I downloaded the git source code, reverted to the
 very first version ( actually e5c4cd8a5e188592f8786a265c0cd073c69ac886
 since the first version even lacked the definition of RDD.scala)

 but the code looks too simple and I can't find where the magic
 happens, i.e. a transformation /computation is scheduled on  a machine,
 bytes stored etc.

 it would be great if someone could show me a path in which the
 different source files are involved, so that I could read each of them in
 turn.

 thanks!
 yang







Hive Query(Top N)

2015-07-20 Thread Ravisankar Mani
Hi everyone

I have tried to  to achieve hierarchical based (index mode) top n creation
using spark query. it taken more time when i execute following query

Select SUM(`adventurepersoncontacts`.`contactid`)  AS
`adventurepersoncontacts_contactid` ,
`adventurepersoncontacts`.`fullname` AS `adventurepersoncontacts_fullname`
FROM  `default`.`adventurepersoncontacts` AS adventurepersoncontacts
 JOIN (
 SELECT
  `F_0`.`fullname_0_0` AS `fullname_0_0`,
  ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC)  AS `R_N_0`
  FROM(
   SELECT
`adventurepersoncontacts`.`fullname` AS `fullname_0_0`,
SUM(adventurepersoncontacts.contactid) AS `Measure_0`
FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts
GROUP BY `adventurepersoncontacts`.`fullname`
   )`F_0`
) `T_0` on ((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` )
AND (`T_0`.`R_N_0` = 5)) GROUP BY `adventurepersoncontacts`.`fullname

In mentioned query, I have set row index in every group according to the
aggregation type. Row_number calculation according to the aggregation like
Row_Number() Over( order by sum( column name) order by DESC) not directly
supported in spark . So i have using sub query like

 SELECT
  `F_0`.`fullname_0_0` AS `fullname_0_0`,
  ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC)  AS `R_N_0`
  FROM(
   SELECT
`adventurepersoncontacts`.`fullname` AS `fullname_0_0`,
SUM(adventurepersoncontacts.contactid) AS `Measure_0`
FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts
GROUP BY `adventurepersoncontacts`.`fullname`
   )`F_0`
) `T_0`

The execution time was getting slowed when using following way. In case i
removed  main group by( remove  aggregation) method , it getting very fast
execution.
Refer following query.  I mentioned this query only for slow execution when
i include main group by


 Select  `adventurepersoncontacts`.`contactid`  AS
`adventurepersoncontacts_contactid` ,
`adventurepersoncontacts`.`fullname` AS `adventurepersoncontacts_fullname`
FROM  `default`.`adventurepersoncontacts` AS adventurepersoncontacts
 JOIN (
 SELECT
  `F_0`.`fullname_0_0` AS `fullname_0_0`,
  ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC)  AS `R_N_0`
  FROM(
   SELECT
`adventurepersoncontacts`.`fullname` AS `fullname_0_0`,
SUM(adventurepersoncontacts.contactid) AS `Measure_0`
FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts
GROUP BY `adventurepersoncontacts`.`fullname`
   )`F_0`
) `T_0` on ((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` )
AND (`T_0`.`R_N_0` = 5))


I have found other way for getting this hierarchical  query.

I have created a temp table ( The table refers  inner sub query(refer
previous query) )

 CREATE external table IF NOT EXISTS  temp_table  AS SELECT
SUM(adventurepersoncontacts.contactid) as contactid,
adventurepersoncontacts.fullname as fullname FROM
`default`.`adventurepersoncontacts` AS adventurepersoncontacts GROUP BY
`adventurepersoncontacts`.`fullname`;



SELECT  SUM(`adventurepersoncontacts`.`contactid`) AS
`adventurepersoncontacts_contactid` ,`adventurepersoncontacts`.`fullname`
AS `adventurepersoncontacts_fullname` FROM
`default`.`adventurepersoncontacts` AS adventurepersoncontacts JOIN (SELECT
`fullname` AS `fullname_0_0`,ROW_NUMBER() over( order by `contactid` desc)
AS `R_N_0` FROM default.temp_table) `T_0` on
((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` ) AND
(`T_0`.`R_N_0` = 2)) GROUP BY `adventurepersoncontacts`.`fullname`;

This is other way. But this execution time was delayed when using create
table using select statement


Please help any other  optimized way to achieve my requirement.


Regards,
Ravi


Re: Exception while triggering spark job from remote jvm

2015-07-20 Thread Akhil Das
Just make sure there is no firewall/network blocking the requests as its
complaining about timeout.

Thanks
Best Regards

On Mon, Jul 20, 2015 at 1:14 AM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:

 Just to add more information. I have checked the status of this file, not
 a single block is corrupted.

 *[hadoop@ip-172-31-24-27 ~]$   hadoop fsck /ankit -files -blocks*
 *DEPRECATED: Use of this script to execute hdfs command is deprecated.*
 *Instead use the hdfs command for it.*

 Connecting to namenode via
 http://ip-172-31-24-27.us-west-2.compute.internal:9101
 FSCK started by hadoop (auth:SIMPLE) from /172.31.24.27 for path /ankit
 at Sun Jul 19 19:11:37 UTC 2015
 /ankit dir
 /ankit/SPARKSQLPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar 103599417
 bytes, 1 block(s):  OK
 0. BP-511626939-172.31.24.27-1436185102368:blk_1073741964_126634
 len=103599417 repl=1

 *Status: HEALTHY*
 * Total size: 103599417 B*
 * Total dirs: 1*
 * Total files: 1*
  Total symlinks: 0
  Total blocks (validated): 1 (avg. block size 103599417 B)
  Minimally replicated blocks: 1 (100.0 %)
  Over-replicated blocks: 0 (0.0 %)
  Under-replicated blocks: 0 (0.0 %)
  Mis-replicated blocks: 0 (0.0 %)
  Default replication factor: 1
  Average block replication: 1.0
 * Corrupt blocks: 0*
  Missing replicas: 0 (0.0 %)
  Number of data-nodes: 4
  Number of racks: 1
 FSCK ended at Sun Jul 19 19:11:37 UTC 2015 in 1 milliseconds


 On Mon, Jul 20, 2015 at 12:33 AM, ankit tyagi ankittyagi.mn...@gmail.com
 wrote:

 Hi,

 I am using below code to trigger spark job from remote jvm.

 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.deploy.yarn.Client;
 import org.apache.spark.deploy.yarn.ClientArguments;

 /**
  * @version 1.0, 15-Jul-2015
  * @author ankit
  */

 public class QueryEngineImpl implements IQueryEngine {

 SparkSqlEngine sqlEngine;

 public QueryEngineImpl(SparkSqlEngine sparkSqlEngine) {
 this.sqlEngine = sparkSqlEngine;
 }

 @Override
 public void executeQuery(String query, String resultLocation,
 String... parquetFileLocation) {
 // TODO Auto-generated method stub
 String[] args = new String[] {
 // the name of your application
 --name,
 RemoteJVM,

 // memory for driver (optional)
 --driver-memory,
 1000M,

 // path to your application's JAR file
 // required in yarn-cluster mode
 --jar,
 hdfs://
 52.24.76.10:9000/ankit/SPARKSQLPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 ,

 // name of your application's main class (required)
 --class,
 SparkSqlEngine,

 // argument 1 to your Spark program
 (SparkFriendRecommendation)
 --arg,
 query,

 // argument 2 to your Spark program
 (SparkFriendRecommendation)
 --arg,
 resultLocation,

 // argument 3 to your Spark program
 (SparkFriendRecommendation)
 --arg,
 parquetFileLocation[0],

 --arg,
 yarn-cluster };

 Configuration conf = new Configuration();
 conf.set(yarn.resourcemanager.address, 52.24.76.10:9022);
 conf.set(HADOOP_HOME, /home/hadoop);

 System.setProperty(SPARK_YARN_MODE, true);
 SparkConf sparkConf = new SparkConf();
 System.out.println(SPARK CONF + sparkConf.toDebugString());
 // create ClientArguments, which will be passed to Client
 org.apache.spark.deploy.yarn.ClientArguments cArgs = new
 ClientArguments(args, sparkConf);
 //create a insntance of yarn client
 Client client = new Client(cArgs, conf, sparkConf);

 client.run();
 }

 public static void main(String[] args) {
 QueryEngineImpl impl = new QueryEngineImpl(null);
 impl.executeQuery(select count(*) from parquetTable,
 /tmp/ankit.txt,
 s3n://AKIAJPLOFN3DM27DIIUQ:zKsFTopwgmu4zNdAfZ5Xe+Qe0XtbegHLTgy629VB@hadoop-poc-ashish
 /parquet);
 }
 }


 But I am getting below exception.

 *23:08:09.268 [main] WARN  org.apache.hadoop.hdfs.DFSClient - Failed to
 connect to /172.31.24.27:9200 http://172.31.24.27:9200 for block, add to
 deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException:
 6 millis timeout while waiting for channel to be ready for connect. ch
 : java.nio.channels.SocketChannel[connection-pending
 remote=/172.31.24.27:9200 http://172.31.24.27:9200]*
 org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
 waiting for channel to be ready for connect. ch :
 java.nio.channels.SocketChannel[connection-pending remote=/
 172.31.24.27:9200]
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
 ~[hadoop-common-2.2.0.jar:na]
 at
 

Re: How to restart Twitter spark stream

2015-07-20 Thread Akhil Das
Jorn meant something like this:

val filteredStream = twitterStream.transform(rdd ={

val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x
= (x,1))

rdd.join(newRDD)

})

​newRDD will work like a filter when you do the join.​


Thanks
Best Regards

On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
 écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I need
 to update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
 thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
 has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve this issue?

 Thanks,
 Zoran




 --

 ***
 Zoran Jeremic, PhD
 Senior System Analyst  Programmer

 Athabasca University
 Tel: +1 604 92 89 944
 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs
 Homepage:  http://zoranjeremic.org

 **



Re: Kmeans Labeled Point RDD

2015-07-20 Thread plazaster
Has there been any progress on this, I am in the same boat.

I posted a similar question to Stack Exchange.

http://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989p23907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Nicolas Phung
Hi Cody,

Thanks for you help. It seems there's something wrong with some messages
within my Kafka topics then. I don't understand how, I can get bigger or
incomplete message since I use default configuration to accept only 1Mb
message in my Kafka topic. If you have any others informations or
suggestions, please tell me.

Regards,
Nicolas PHUNG

On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote:

 Not exactly the same issue, but possibly related:

 https://issues.apache.org/jira/browse/KAFKA-1196

 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Well, working backwards down the stack trace...

 at java.nio.Buffer.limit(Buffer.java:275)

 That exception gets thrown if the limit is negative or greater than the 
 buffer's capacity


 at kafka.message.Message.sliceDelimited(Message.scala:236)

 If size had been negative, it would have just returned null, so we know
 the exception got thrown because the size was greater than the buffer's
 capacity


 I haven't seen that before... maybe a corrupted message of some kind?

 If that problem is reproducible, try providing an explicit argument for
 messageHandler, with a function that logs the message offset.


 On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com
 wrote:

 Hello,

 When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark 
 Streaming Kafka method createDirectStream, everything is fine till a driver 
 error happened (driver is killed, connection lost...). When the driver pops 
 up again, it resumes the processing with the checkpoint in HDFS. Except, I 
 got this:

 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; 
 aborting job
 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
 1437032118000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 
 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 
 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
 at java.nio.Buffer.limit(Buffer.java:275)
 at kafka.message.Message.sliceDelimited(Message.scala:236)
 at kafka.message.Message.payload(Message.scala:218)
 at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
 at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
 at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
 at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
 at 
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at 
 org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
 at 
 org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
 at 
 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
 at 
 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 This is happening only when I'm doing a full data processing from Kafka.
 If there's no load, when you killed the driver and then restart, it resumes
 the checkpoint as expected without missing data. Did someone encounters
 something similar ? How did you solve this ?

 Regards,

 Nicolas PHUNG






Re: Using Dataframe write with newHdoopApi

2015-07-20 Thread ayan guha
Update: I have managed to use df.rdd to complete es integration but I
preferred df.write. is it possible or upcoming?
On 18 Jul 2015 23:19, ayan guha guha.a...@gmail.com wrote:

 Hi

 I am trying to use DF and save it to Elasticsearch using newHadoopApi
 (because I am using python). Can anyone guide me to help if this is even
 possible?

 --
 Best Regards,
 Ayan Guha



Local Repartition

2015-07-20 Thread Daniel Haviv
Hi,
My data is constructed from a lot of small files which results in a lot of
partitions per RDD.
Is there some way to locally repartition the RDD without shuffling so that
all of the partitions that reside on a specific node will become X
partitions on the same node ?

Thank you.
Daniel


Proper saving/loading of MatrixFactorizationModel

2015-07-20 Thread Petr Shestov
Hi all!
I have MatrixFactorizationModel object. If I'm trying to recommend products to 
single user right after constructing model through ALS.train(...) then it takes 
300ms (for my data and hardware). But if I save model to disk and load it back 
then recommendation takes almost 2000ms. Also Spark warns:
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a 
partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. 
Prediction could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a 
partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. 
Prediction could be slow.
How can I create/set partitioner and cache user and product factors after 
loading model? Following approach didn't help:
model.userFeatures().cache();
model.productFeatures().cache();
Also I was trying to repartition those rdds and create new model from 
repartitioned versions but that also didn't help.


---
This email message is for the sole use of the intended recipient(s) and may 
contain
confidential information.  Any unauthorized review, use, disclosure or 
distribution
is prohibited.  If you are not the intended recipient, please contact the 
sender by
reply email and destroy all copies of the original message.
---


PySpark Nested Json Parsing

2015-07-20 Thread Ajay
Hi,

I am new to Apache Spark. I am trying to parse nested json using pyspark.
Here is the code by which I am trying to parse Json.
I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

lines = sc.textFile(inputFile)

import json
def func(x):
json_str = json.loads(x)
if json_str['label']:
if json_str['label']['label2']:
return (1,1)
return (0,1)

lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

I am getting following error,
ERROR [Executor task launch worker-13] executor.Executor
(Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 107, in main
process()
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 247, in func
return f(iterator)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 1561, in combineLocally
merger.mergeValues(iterator)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
line 252, in mergeValues
for k, v in iterator:
  File stdin, line 2, in func
  File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
return _default_decoder.decode(s)
  File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
obj, end = self._scanner.iterscan(s, **kw).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
value, end = iterscan(s, idx=end, context=context).next()
  File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
rval, next_pos = action(m, context)
  File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
return scanstring(match.string, match.end(), encoding, strict)
ValueError: Invalid \escape: line 1 column 855 (char 855)

at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
stage 14.0 (TID 24)
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 107, in main
process()
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
line 98, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 2073, in pipeline_func
return func(split, prev_func(split, iterator))
  File
/opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
line 247, in func
return f(iterator)
  File

Re: Using reference for RDD is safe?

2015-07-20 Thread Mina
Hi, thank you for your answer. but i was talking about function reference.
I want to transform an RDD using a function consisting of multiple
transforms.
For example
def transformFunc1(rdd: RDD[Int]): RDD[Int]  = {


}


val rdd2 = transformFunc1(rdd1)...
here i am using reference, i think but i am not sure. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23911.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LDA on a large dataset

2015-07-20 Thread Peter Zvirinsky
Hello,

I'm trying to run LDA on a relatively large dataset (size 100-200 G), but
with no luck so far.

At first I made sure that the executors have enough memory with respect to
the vocabulary size and number of topics.

After that I ran LDA with default EMLDAOptimizer, but learning failed after
a few iteration, because the application master ran out of disk. The
learning job used all space available in the usercache of the application
master (cca. 100G). I noticed that this implementation uses some sort of
checkopointing so I made sure it is not used, but it didn't help.

Afterwards, I tried the OnlineLDAOptimizer, but it started failing at
reduce at LDAOptimizer.scala:421 with error message: Total size of
serialized results of X tasks (Y GB) is bigger than
spark.driver.maxResultSize (Y GB). I kept increasing the
spark.driver.maxResultSize to tens of GB but it didn't help, just delayed
this error. I tried to adjust the batch size to very small values so that I
was sure it must fit into memory, but this didn't help at all.

Has anyone experience with learning LDA on such a dataset? Maybe some ideas
what might be wrong?

I'm using spark 1.4.0 in yarn-client mode. I managed to learn a word2vec
model on the same dataset with no problems at all.

Thanks,
Peter


spark streaming 1.3 issues

2015-07-20 Thread Shushant Arora
Hi

1.I am using spark streaming 1.3 for reading from a kafka queue and pushing
events to external source.

I passed in my job 20 executors but it is showing only 6 in executor tab ?
When I used highlevel streaming 1.2 - its showing 20 executors. My cluster
is 10 node yarn cluster with each node has 8 cores.

I am calling the script as :

spark-submit --class classname --num-executors 10 --executor-cores 2
--master yarn-client jarfile

2. On Streaming UI

Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
Time since start: 13 minutes 28 seconds
Network receivers: 0
Batch interval: 1 second
Processed batches: 807
Waiting batches: 0
Received records: 0
Processed records: 0

Received records and processed records are always 0 . And Speed of
processing is slow compare to highlevel api.

I am procesing the stream using mapPartition.

When I used
directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[],
Void() {
 @Override
public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception {
// TODO Auto-generated method stub
OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
}
}

It throws an exception
java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

Thanks
Shushant


Re: JdbcRDD and ClassTag issue

2015-07-20 Thread nitinkalra2000
Thanks Sujee :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-and-ClassTag-issue-tp18570p23912.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Local Repartition

2015-07-20 Thread Doug Balog
Hi Daniel,
Take a look at .coalesce()
I’ve seen good results by coalescing to num executors * 10, but I’m still 
trying to figure out the 
optimal number of partitions per executor. 
To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1)


Cheers,

Doug

 On Jul 20, 2015, at 5:04 AM, Daniel Haviv daniel.ha...@veracity-group.com 
 wrote:
 
 Hi,
 My data is constructed from a lot of small files which results in a lot of 
 partitions per RDD.
 Is there some way to locally repartition the RDD without shuffling so that 
 all of the partitions that reside on a specific node will become X partitions 
 on the same node ?
 
 Thank you.
 Daniel


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache Spark : spark.eventLog.dir on Windows Environment

2015-07-20 Thread nitinkalra2000
Hi All,

I am working on Spark 1.4 on windows environment. I have to set eventLog
directory so that I can reopen the Spark UI after application has finished.

But I am not able to set eventLog.dir, It gives an error on Windows
environment.

Configuation is :

entry key=spark.eventLog.enabled value=true /
entry key=spark.eventLog.dir value=file:///c:/sparklogs / 

Exception I get :

java.io.IOException: Cannot run program cygpath: CreateProcess error=2,  
The system cannot find the file specified
at java.lang.ProcessBuilder.start(Unknown Source)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:206)

I have also tried installing Cygwin but still the error doesn't go.

Can anybody give any advice on it?

I have posted the same question on Stackoverflow as well :
http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment

Thanks
Nitin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark streaming support is there with RabbitMQ

2015-07-20 Thread Jeetendra Gangele
Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want
to process them using Apache Spark streaming does it scale?

Regards
Jeetendra


Re: Does Spark streaming support is there with RabbitMQ

2015-07-20 Thread Todd Nist
There is one package available on the spark-packages site,

http://spark-packages.org/package/Stratio/RabbitMQ-Receiver

The source is here:

https://github.com/Stratio/RabbitMQ-Receiver

Not sure that meets your needs or not.

-Todd

On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want
 to process them using Apache Spark streaming does it scale?

 Regards
 Jeetendra



Re: spark streaming 1.3 issues

2015-07-20 Thread Shushant Arora
Is coalesce not applicable to kafkaStream ? How to do coalesce on
kafkadirectstream its not there in api ?
Shall calling repartition on directstream with number of executors as
numpartitions will imrove perfromance ?

Does in 1.3 tasks get launched for partitions which are empty? Does driver
makes call for getting offsets of each partition separately or in single
call it gets all partitions new offsets ? I mean will reducing no of
 partitions oin kafka help improving the performance?

On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 1.I am using spark streaming 1.3 for reading from a kafka queue and
 pushing events to external source.

 I passed in my job 20 executors but it is showing only 6 in executor tab ?
 When I used highlevel streaming 1.2 - its showing 20 executors. My cluster
 is 10 node yarn cluster with each node has 8 cores.

 I am calling the script as :

 spark-submit --class classname --num-executors 10 --executor-cores 2
 --master yarn-client jarfile

 2. On Streaming UI

 Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
 Time since start: 13 minutes 28 seconds
 Network receivers: 0
 Batch interval: 1 second
 Processed batches: 807
 Waiting batches: 0
 Received records: 0
 Processed records: 0

 Received records and processed records are always 0 . And Speed of
 processing is slow compare to highlevel api.

 I am procesing the stream using mapPartition.

 When I used
 directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[],
 Void() {
  @Override
 public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception {
 // TODO Auto-generated method stub
 OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
 }
 }

 It throws an exception
 java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot
 be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

 Thanks
 Shushant









Joda Time best practice?

2015-07-20 Thread algermissen1971
Hi,

I am having trouble with Joda Time in a Spark application and saw by now that I 
am not the only one (generally seems to have to do with serialization and 
internal caches of the Joda Time objects).

Is there a known best practice to work around these issues?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark streaming support is there with RabbitMQ

2015-07-20 Thread Jeetendra Gangele
Thanks Todd,

I m not sure whether somebody has used it or not. can somebody confirm if
this integrate nicely with Spark streaming?


On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote:

 There is one package available on the spark-packages site,

 http://spark-packages.org/package/Stratio/RabbitMQ-Receiver

 The source is here:

 https://github.com/Stratio/RabbitMQ-Receiver

 Not sure that meets your needs or not.

 -Todd

 On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I
 want to process them using Apache Spark streaming does it scale?

 Regards
 Jeetendra





Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
I'd try logging the offsets for each message, see where problems start,
then try using the console consumer starting at those offsets and see if
you can reproduce the problem.

On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com
wrote:

 Hi Cody,

 Thanks for you help. It seems there's something wrong with some messages
 within my Kafka topics then. I don't understand how, I can get bigger or
 incomplete message since I use default configuration to accept only 1Mb
 message in my Kafka topic. If you have any others informations or
 suggestions, please tell me.

 Regards,
 Nicolas PHUNG

 On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Not exactly the same issue, but possibly related:

 https://issues.apache.org/jira/browse/KAFKA-1196

 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Well, working backwards down the stack trace...

 at java.nio.Buffer.limit(Buffer.java:275)

 That exception gets thrown if the limit is negative or greater than the 
 buffer's capacity


 at kafka.message.Message.sliceDelimited(Message.scala:236)

 If size had been negative, it would have just returned null, so we know
 the exception got thrown because the size was greater than the buffer's
 capacity


 I haven't seen that before... maybe a corrupted message of some kind?

 If that problem is reproducible, try providing an explicit argument for
 messageHandler, with a function that logs the message offset.


 On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com
  wrote:

 Hello,

 When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark 
 Streaming Kafka method createDirectStream, everything is fine till a 
 driver error happened (driver is killed, connection lost...). When the 
 driver pops up again, it resumes the processing with the checkpoint in 
 HDFS. Except, I got this:

 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 
 times; aborting job
 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
 1437032118000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 
 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 
 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at kafka.message.Message.sliceDelimited(Message.scala:236)
at kafka.message.Message.payload(Message.scala:218)
at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
at 
 org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
at 
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
 org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
at 
 org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
at 
 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at 
 org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

 This is happening only when I'm doing a full data processing from
 Kafka. If there's no load, when you killed the driver and then restart, it
 resumes the checkpoint as expected without missing data. Did someone
 encounters something similar ? How did you solve this ?

 Regards,

 Nicolas PHUNG







Web UI Links

2015-07-20 Thread Bob Corsaro
I'm running a spark cluster and I'd like to access the Spark-UI from
outside the LAN. The problem is all the links are to internal IP addresses.
Is there anyway to config hostnames for each of the hosts in the cluster
and use those for the links?


k-means iteration not terminate

2015-07-20 Thread Pa Rö
hi community,

i have write a spark k-means app. now i run it on a cluster.
my job start and at iteration nine or ten the process stop.
in the spark dashbord all time shown is running, but nothing
happend, no exceptions.

my setting is the following:
1000 input points
k=10
maxIteration=30
a tree node cluster (one node have 16GB RAM und 8cores i7)
i use cloudera live 5.4.4 with spark 1.3

maybe spark need more memory or i have a wrong setting?

best regards,
paul


Re: Local Repartition

2015-07-20 Thread Daniel Haviv
Thanks Doug,
coalesce might invoke a shuffle as well.
I don't think what I'm suggesting is a feature but it definitely should be.

Daniel

On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote:

 Hi Daniel,
 Take a look at .coalesce()
 I’ve seen good results by coalescing to num executors * 10, but I’m still
 trying to figure out the
 optimal number of partitions per executor.
 To get the number of executors,
 sc.getConf.getInt(“spark.executor.instances”,-1)


 Cheers,

 Doug

  On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:
 
  Hi,
  My data is constructed from a lot of small files which results in a lot
 of partitions per RDD.
  Is there some way to locally repartition the RDD without shuffling so
 that all of the partitions that reside on a specific node will become X
 partitions on the same node ?
 
  Thank you.
  Daniel




Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Could you try SQLContext.read.json()?

On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote:
 Before using the json file as text file, can you make sure that each
 json string can fit in one line? Because textFile() will split the
 file by '\n'

 On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in 

Re: PySpark Nested Json Parsing

2015-07-20 Thread Davies Liu
Before using the json file as text file, can you make sure that each
json string can fit in one line? Because textFile() will split the
file by '\n'

On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
 Hi,

 I am new to Apache Spark. I am trying to parse nested json using pyspark.
 Here is the code by which I am trying to parse Json.
 I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.

 lines = sc.textFile(inputFile)

 import json
 def func(x):
 json_str = json.loads(x)
 if json_str['label']:
 if json_str['label']['label2']:
 return (1,1)
 return (0,1)

 lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile)

 I am getting following error,
 ERROR [Executor task launch worker-13] executor.Executor
 (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 247, in func
 return f(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 1561, in combineLocally
 merger.mergeValues(iterator)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
 line 252, in mergeValues
 for k, v in iterator:
   File stdin, line 2, in func
   File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
 return _default_decoder.decode(s)
   File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
   File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
 obj, end = self._scanner.iterscan(s, **kw).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
 value, end = iterscan(s, idx=end, context=context).next()
   File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
 rval, next_pos = action(m, context)
   File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
 return scanstring(match.string, match.end(), encoding, strict)
 ValueError: Invalid \escape: line 1 column 855 (char 855)

 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
 at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
 executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in
 stage 14.0 (TID 24)
 org.apache.spark.api.python.PythonException: Traceback (most recent call
 last):
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 107, in main
 process()
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
 line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
 line 2073, in pipeline_func
 return func(split, prev_func(split, iterator))
   File
 

dataframes sql order by not total ordering

2015-07-20 Thread Carol McDonald
the following query on the Movielens dataset , is sorting by the count of
ratings for a movie.  It looks like the results  are ordered  by partition
?
scala val results =sqlContext.sql(select movies.title, movierates.maxr,
movierates.minr, movierates.cntu from(SELECT ratings.product,
max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct
user) as cntu FROM ratings group by ratings.product order by cntu desc)
movierates join movies on movierates.product=movies.movieId )

scala results.take(30).foreach(println)
[Right Stuff, The (1983),5.0,1.0,750]
[Lost in Space (1998),5.0,1.0,667]
[Dumb  Dumber (1994),5.0,1.0,660]
[Patch Adams (1998),5.0,1.0,474]
[Carlito's Way (1993),5.0,1.0,369]
[Rounders (1998),5.0,1.0,345]
[Bedknobs and Broomsticks (1971),5.0,1.0,319]
[Beverly Hills Ninja (1997),5.0,1.0,232]
[Saving Grace (2000),5.0,1.0,186]
[Dangerous Minds (1995),5.0,1.0,141]
[Death Wish II (1982),5.0,1.0,85]
[All Dogs Go to Heaven 2 (1996),5.0,1.0,75]
[Repossessed (1990),4.0,1.0,53]
[Assignment, The (1997),5.0,1.0,49]
[$1,000,000 Duck (1971),5.0,1.0,37]
[Stonewall (1995),5.0,1.0,20]
[Dog of Flanders, A (1999),5.0,1.0,8]
[Frogs for Snakes (1998),3.0,1.0,5]
[It's in the Water (1998),3.0,2.0,3]
[Twelve Monkeys (1995),5.0,1.0,1511]
[Ransom (1996),5.0,1.0,564]
[Alice in Wonderland (1951),5.0,1.0,525]
[City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392]
[Eat Drink Man Woman (1994),5.0,1.0,346]
[Cube (1997),5.0,1.0,233]
[Omega Man, The (1971),5.0,1.0,224]
[Stepmom (1998),5.0,1.0,146]
[Metro (1997),5.0,1.0,100]
[Death Wish 3 (1985),5.0,1.0,72]
[Stalker (1979),5.0,1.0,52]


Re: Joda Time best practice?

2015-07-20 Thread Harish Butani
Hey Jan,

Can you provide more details on the serialization and cache issues.

If you are looking for datetime functionality with spark-sql please
consider:  https://github.com/SparklineData/spark-datetime It provides a
simple way to combine joda datetime expressions with spark sql.

regards,
Harish.

On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com
 wrote:

 Hi,

 I am having trouble with Joda Time in a Spark application and saw by now
 that I am not the only one (generally seems to have to do with
 serialization and internal caches of the Joda Time objects).

 Is there a known best practice to work around these issues?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




What is the correct syntax of using Spark streamingContext.fileStream()?

2015-07-20 Thread unk1102
Hi I am trying to find correct way to use Spark Streaming API
streamingContext.fileStream(String,ClassK,ClassV,ClassF)

I tried to find example but could not find it anywhere in either Spark
documentation. I have to stream files in hdfs which is of custom hadoop
format.

  JavaPairDStreamVoid,MyRecordWritable input = streamingContext.
fileStream(/path/to/hdfs/stream/dir/,
Void.class,
MyRecordWritable.class,
MyInputFormat.class,
??);

How do I implement fourth argument class type Function mentioned as ??
Please guide I am new to Spark Streaming. Thank in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-correct-syntax-of-using-Spark-streamingContext-fileStream-tp23916.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark Nested Json Parsing

2015-07-20 Thread Naveen Madhire
I had the similar issue with spark 1.3
After migrating to Spark 1.4 and using sqlcontext.read.json it worked well
I think you can look at dataframe select and explode options to read the
nested json elements, array etc.

Thanks.


On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu dav...@databricks.com wrote:

 Could you try SQLContext.read.json()?

 On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote:
  Before using the json file as text file, can you make sure that each
  json string can fit in one line? Because textFile() will split the
  file by '\n'
 
  On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote:
  Hi,
 
  I am new to Apache Spark. I am trying to parse nested json using
 pyspark.
  Here is the code by which I am trying to parse Json.
  I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2.
 
  lines = sc.textFile(inputFile)
 
  import json
  def func(x):
  json_str = json.loads(x)
  if json_str['label']:
  if json_str['label']['label2']:
  return (1,1)
  return (0,1)
 
  lines.map(func).reduceByKey(lambda a,b: a +
 b).saveAsTextFile(outputFile)
 
  I am getting following error,
  ERROR [Executor task launch worker-13] executor.Executor
  (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID
 25)
  org.apache.spark.api.python.PythonException: Traceback (most recent call
  last):
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
  line 107, in main
  process()
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py,
  line 98, in process
  serializer.dump_stream(func(split_index, iterator), outfile)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 2073, in pipeline_func
  return func(split, prev_func(split, iterator))
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 2073, in pipeline_func
  return func(split, prev_func(split, iterator))
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 247, in func
  return f(iterator)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py,
  line 1561, in combineLocally
  merger.mergeValues(iterator)
File
 
 /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py,
  line 252, in mergeValues
  for k, v in iterator:
File stdin, line 2, in func
File /usr/lib64/python2.6/json/__init__.py, line 307, in loads
  return _default_decoder.decode(s)
File /usr/lib64/python2.6/json/decoder.py, line 319, in decode
  obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode
  obj, end = self._scanner.iterscan(s, **kw).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
  value, end = iterscan(s, idx=end, context=context).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject
  value, end = iterscan(s, idx=end, context=context).next()
File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan
  rval, next_pos = action(m, context)
File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString
  return scanstring(match.string, match.end(), encoding, strict)
  ValueError: Invalid \escape: line 1 column 855 (char 855)
 
  at
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
  at
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
  at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
  2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12]
  executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0
 in
  stage 14.0 (TID 24)
  org.apache.spark.api.python.PythonException: Traceback (most recent call
  

RE: Spark and SQL Server

2015-07-20 Thread Young, Matthew T
When attempting to write a Dataframe to SQL Server that contains 
java.sql.Timestamp or java.lang.boolean objects I get errors about the query 
that is formed being invalid. Specifically, java.sql.Timestamp objects try to 
be written as the Timestamp type, which is not appropriate for date/time 
storage in TSQL. They should be datetimes or datetime2s.

java.lang.boolean errors out because it tries to specify the width of the BIT 
field, which SQL Server doesn't like.

However, I can write strings/varchars and ints without any issues.



From: Davies Liu [dav...@databricks.com]
Sent: Monday, July 20, 2015 9:08 AM
To: Young, Matthew T
Cc: user@spark.apache.org
Subject: Re: Spark and SQL Server

Sorry for the confusing. What's the other issues?

On Mon, Jul 20, 2015 at 8:26 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Thanks Davies, that resolves the issue with Python.

 I was using the Java/Scala DataFrame documentation 
 https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html
  and assuming that it was the same for PySpark 
 http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.
  I will keep this distinction in mind going forward.

 I guess we have to wait for Microsoft to release an SQL Server connector for 
 Spark to resolve the other issues.

 Cheers,

 -- Matthew Young

 
 From: Davies Liu [dav...@databricks.com]
 Sent: Saturday, July 18, 2015 12:45 AM
 To: Young, Matthew T
 Cc: user@spark.apache.org
 Subject: Re: Spark and SQL Server

 I think you have a mistake on call jdbc(), it should be:

 jdbc(self, url, table, mode, properties)

 You had use properties as the third parameter.

 On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
 matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered 
 a couple of issues writing back. In Scala 2.10 I can write back to the 
 database except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think 
 it should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error 
 msg: Cannot specify a column width on data type bit). Do I need to edit 
 Spark source to fix this behavior, or is there a configuration option 
 somewhere that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j 
 seems unable to convert a Python dict into a Java hashmap, which is 
 necessary for parameter passing. I’ve documented details of this problem 
 with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Cody Koeninger
Yeah, in the function you supply for the messageHandler parameter to
createDirectStream, catch the exception and do whatever makes sense for
your application.

On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph...@gmail.com
wrote:

 Hello,

 Using the old Spark Streaming Kafka API, I got the following around the
 same offset:

 kafka.message.InvalidMessageException: Message is corrupt (stored crc =
 3561357254, computed crc = 171652633)
 at kafka.message.Message.ensureValid(Message.scala:166)
 at
 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
 at
 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
 at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at
 org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
 java.lang.IllegalStateException: Iterator is in failed state
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
 at
 org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 I found some old topic about some possible corrupt Kafka message produced
 by the new producer API with Snappy compression on. My question is, is it
 possible to skip/ignore those offsets when full processing with 
 KafkaUtils.createStream
 or KafkaUtils.createDirectStream ?

 Regards,
 Nicolas PHUNG

 On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I'd try logging the offsets for each message, see where problems start,
 then try using the console consumer starting at those offsets and see if
 you can reproduce the problem.

 On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com
 wrote:

 Hi Cody,

 Thanks for you help. It seems there's something wrong with some messages
 within my Kafka topics then. I don't understand how, I can get bigger or
 incomplete message since I use default configuration to accept only 1Mb
 message in my Kafka topic. If you have any others informations or
 suggestions, please tell me.

 Regards,
 Nicolas PHUNG

 On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Not exactly the same issue, but possibly related:

 https://issues.apache.org/jira/browse/KAFKA-1196

 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Well, working backwards down the stack trace...

 at java.nio.Buffer.limit(Buffer.java:275)

 That exception gets thrown if the limit is negative or greater than the 
 buffer's capacity


 at kafka.message.Message.sliceDelimited(Message.scala:236)

 If size had been negative, it would have just returned null, so we
 know the exception got thrown because the size was greater than the
 buffer's capacity


 I haven't seen that before... maybe a corrupted message of some kind?

 If that problem is reproducible, try providing an explicit argument
 for messageHandler, with a function that logs the message offset.


 On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung 
 nicolas.ph...@gmail.com wrote:

 Hello,

 When I'm reprocessing the data from kafka (about 40 Gb) with the new 
 Spark Streaming Kafka method createDirectStream, everything is fine till 
 a driver error happened (driver is killed, connection lost...). When the 
 driver pops up again, it resumes the processing with the checkpoint in 
 HDFS. Except, I got this:

 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 
 times; aborting job
 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
 1437032118000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 
 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in 
 stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
  at java.nio.Buffer.limit(Buffer.java:275)
  at kafka.message.Message.sliceDelimited(Message.scala:236)
  at kafka.message.Message.payload(Message.scala:218)
  at 

Broadcast variables in R

2015-07-20 Thread Serge Franchois
I've searched high and low to use broadcast variables in R.
Is is possible at all? I don't see them mentioned in the SparkR API.
Or is there another way of using this feature?

I need to share a large amount of data between executors. 
At the moment, I get warned about my task being too large. 

I have tried pyspark, and there I can use them.

Wkr,

Serge




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark and SQL Server

2015-07-20 Thread Young, Matthew T
Thanks Davies, that resolves the issue with Python.

I was using the Java/Scala DataFrame documentation 
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html
 and assuming that it was the same for PySpark 
http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.
 I will keep this distinction in mind going forward.

I guess we have to wait for Microsoft to release an SQL Server connector for 
Spark to resolve the other issues.

Cheers,

-- Matthew Young


From: Davies Liu [dav...@databricks.com]
Sent: Saturday, July 18, 2015 12:45 AM
To: Young, Matthew T
Cc: user@spark.apache.org
Subject: Re: Spark and SQL Server

I think you have a mistake on call jdbc(), it should be:

jdbc(self, url, table, mode, properties)

You had use properties as the third parameter.

On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a 
 couple of issues writing back. In Scala 2.10 I can write back to the database 
 except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think it 
 should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error msg: 
 Cannot specify a column width on data type bit). Do I need to edit Spark 
 source to fix this behavior, or is there a configuration option somewhere 
 that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j seems 
 unable to convert a Python dict into a Java hashmap, which is necessary for 
 parameter passing. I’ve documented details of this problem with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Local Repartition

2015-07-20 Thread Daniel Haviv
Great explanation.

Thanks guys!

Daniel

 On 20 ביולי 2015, at 18:12, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 Hi Daniel,
 
 Coalesce, by default will not cause a shuffle. The second parameter when set 
 to true will cause a full shuffle. This is actually what repartition does 
 (calls coalesce with shuffle=true).
 
 It will attempt to keep colocated partitions together (as you describe) on 
 the same executor. What may happen is you lose data locality if you reduce 
 the partitions to fewer than the number of executors. You obviously also 
 reduce parallelism so you need to be aware of that as you decide when to call 
 coalesce.
 
 Thanks,
 Silvio
 
 From: Daniel Haviv
 Date: Monday, July 20, 2015 at 4:59 PM
 To: Doug Balog
 Cc: user
 Subject: Re: Local Repartition
 
 Thanks Doug,
 coalesce might invoke a shuffle as well. 
 I don't think what I'm suggesting is a feature but it definitely should be.
 
 Daniel
 
 On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote:
 Hi Daniel,
 Take a look at .coalesce()
 I’ve seen good results by coalescing to num executors * 10, but I’m still 
 trying to figure out the
 optimal number of partitions per executor.
 To get the number of executors, 
 sc.getConf.getInt(“spark.executor.instances”,-1)
 
 
 Cheers,
 
 Doug
 
  On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
  daniel.ha...@veracity-group.com wrote:
 
  Hi,
  My data is constructed from a lot of small files which results in a lot of 
  partitions per RDD.
  Is there some way to locally repartition the RDD without shuffling so that 
  all of the partitions that reside on a specific node will become X 
  partitions on the same node ?
 
  Thank you.
  Daniel
 


Re: use S3-Compatible Storage with spark

2015-07-20 Thread Schmirr Wurst
Thanks, that is what I was looking for...

Any Idea where I have to store and reference the corresponding
hadoop-aws-2.6.0.jar ?:

java.io.IOException: No FileSystem for scheme: s3n

2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
 Not in the uri, but in the hadoop configuration you can specify it.

 property
   namefs.s3a.endpoint/name
   descriptionAWS S3 endpoint to connect to. An up-to-date list is
 provided in the AWS Documentation: regions and endpoints. Without this
 property, the standard region (s3.amazonaws.com) is assumed.
   /description
 /property


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com
 wrote:

 I want to use pithos, were do I can specify that endpoint, is it
 possible in the url ?

 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com:
  Could you name the Storage service that you are using? Most of them
  provides
  a S3 like RestAPI endpoint for you to hit.
 
  Thanks
  Best Regards
 
  On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com
  wrote:
 
  Hi,
 
  I wonder how to use S3 compatible Storage in Spark ?
  If I'm using s3n:// url schema, the it will point to amazon, is there
  a way I can specify the host somewhere ?
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Local Repartition

2015-07-20 Thread Silvio Fiorito
Hi Daniel,

Coalesce, by default will not cause a shuffle. The second parameter when set to 
true will cause a full shuffle. This is actually what repartition does (calls 
coalesce with shuffle=true).

It will attempt to keep colocated partitions together (as you describe) on the 
same executor. What may happen is you lose data locality if you reduce the 
partitions to fewer than the number of executors. You obviously also reduce 
parallelism so you need to be aware of that as you decide when to call coalesce.

Thanks,
Silvio

From: Daniel Haviv
Date: Monday, July 20, 2015 at 4:59 PM
To: Doug Balog
Cc: user
Subject: Re: Local Repartition

Thanks Doug,
coalesce might invoke a shuffle as well.
I don't think what I'm suggesting is a feature but it definitely should be.

Daniel

On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog 
d...@balog.netmailto:d...@balog.net wrote:
Hi Daniel,
Take a look at .coalesce()
I’ve seen good results by coalescing to num executors * 10, but I’m still 
trying to figure out the
optimal number of partitions per executor.
To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1)


Cheers,

Doug

 On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
 daniel.ha...@veracity-group.commailto:daniel.ha...@veracity-group.com 
 wrote:

 Hi,
 My data is constructed from a lot of small files which results in a lot of 
 partitions per RDD.
 Is there some way to locally repartition the RDD without shuffling so that 
 all of the partitions that reside on a specific node will become X partitions 
 on the same node ?

 Thank you.
 Daniel




Re: What else is need to setup native support of BLAS/LAPACK with Spark?

2015-07-20 Thread Arun Ahuja
Cool, I tried that as well, and doesn't seem different:

spark.yarn.jar seems set

[image: Inline image 1]

This actually doesn't change the classpath, not sure if it should:

[image: Inline image 3]

But same netlib warning.

Thanks for the help!
- Arun

On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Can you try setting the spark.yarn.jar property to make sure it points to
 the jar you're thinking of?

 -Sandy

 On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote:

 Yes, it's a YARN cluster and using spark-submit to run.  I have
 SPARK_HOME set to the directory above and using the spark-submit script
 from there.

 bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 
 8g --num-executors 400 --executor-cores 1 --class 
 org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 
 --conf spark.storage.memoryFraction=0.15

 ​

 libgfortran.so.3 is also there

 ls  /usr/lib64/libgfortran.so.3
 /usr/lib64/libgfortran.so.3

 These are jniloader files in the jar

 jar tf 
 /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  | grep jniloader
 META-INF/maven/com.github.fommil/jniloader/
 META-INF/maven/com.github.fommil/jniloader/pom.xml
 META-INF/maven/com.github.fommil/jniloader/pom.properties

 ​

 Thanks,
 Arun

 On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote:

 Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue.

 I'm pretty sure the answer is 'yes', but, make sure the assembly has
 jniloader too. I don't see why it wouldn't, but, that's needed.

 What is your env like -- local, standalone, YARN? how are you running?
 Just want to make sure you are using this assembly across your cluster.

 On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote:

 Hi Sean,

 Thanks for the reply! I did double-check that the jar is one I think I
 am running:

 [image: Inline image 2]

 jar tf 
 /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar
  | grep netlib | grep Native
 com/github/fommil/netlib/NativeRefARPACK.class
 com/github/fommil/netlib/NativeRefBLAS.class
 com/github/fommil/netlib/NativeRefLAPACK.class
 com/github/fommil/netlib/NativeSystemARPACK.class
 com/github/fommil/netlib/NativeSystemBLAS.class
 com/github/fommil/netlib/NativeSystemLAPACK.class

 Also, I checked the gfortran version on the cluster nodes and it is
 available and is 5.1

 $ gfortran --version
 GNU Fortran (GCC) 5.1.0
 Copyright (C) 2015 Free Software Foundation, Inc.

 and still see:

 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: 
 com.github.fommil.netlib.NativeSystemBLAS
 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: 
 com.github.fommil.netlib.NativeRefBLAS
 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: 
 com.github.fommil.netlib.NativeSystemLAPACK
 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: 
 com.github.fommil.netlib.NativeRefLAPACK

 ​

 Does anything need to be adjusted in my application POM?

 Thanks,
 Arun

 On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote:

 Yes, that's most of the work, just getting the native libs into the
 assembly. netlib can find them from there even if you don't have BLAS
 libs on your OS, since it includes a reference implementation as a
 fallback.

 One common reason it won't load is not having libgfortran installed on
 your OSes though. It has to be 4.6+ too. That can't be shipped even in
 netlib and has to exist on your hosts.

 The other thing I'd double-check is whether you are really using this
 assembly you built for your job -- like, it's the actually the
 assembly the executors are using.


 On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote:
  Is there more documentation on what is needed to setup BLAS/LAPACK
 native
  suport with Spark.
 
  I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib
 classes
  are in the assembly jar.
 
  jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar  | grep netlib
 | grep
  Native
6625 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefARPACK.class
   21123 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefBLAS.class
  178334 Tue Jul 07 15:22:08 EDT 2015
  com/github/fommil/netlib/NativeRefLAPACK.class
6640 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemARPACK.class
   21138 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemBLAS.class
  178349 Tue Jul 07 15:22:10 EDT 2015
  com/github/fommil/netlib/NativeSystemLAPACK.class
 
  Also I see the following in /usr/lib64
 
  ls /usr/lib64/libblas.
  libblas.a libblas.solibblas.so.3  libblas.so.3.2
  libblas.so.3.2.1
 
  ls /usr/lib64/liblapack
  liblapack.a liblapack_pic.a liblapack.so
 liblapack.so.3
  liblapack.so.3.2liblapack.so.3.2.1
 

Re: Spark and SQL Server

2015-07-20 Thread Davies Liu
Sorry for the confusing. What's the other issues?

On Mon, Jul 20, 2015 at 8:26 AM, Young, Matthew T
matthew.t.yo...@intel.com wrote:
 Thanks Davies, that resolves the issue with Python.

 I was using the Java/Scala DataFrame documentation 
 https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html
  and assuming that it was the same for PySpark 
 http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.
  I will keep this distinction in mind going forward.

 I guess we have to wait for Microsoft to release an SQL Server connector for 
 Spark to resolve the other issues.

 Cheers,

 -- Matthew Young

 
 From: Davies Liu [dav...@databricks.com]
 Sent: Saturday, July 18, 2015 12:45 AM
 To: Young, Matthew T
 Cc: user@spark.apache.org
 Subject: Re: Spark and SQL Server

 I think you have a mistake on call jdbc(), it should be:

 jdbc(self, url, table, mode, properties)

 You had use properties as the third parameter.

 On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
 matthew.t.yo...@intel.com wrote:
 Hello,

 I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
 4.2 JDBC Driver. Reading from the database works ok, but I have encountered 
 a couple of issues writing back. In Scala 2.10 I can write back to the 
 database except for a couple of types.


 1.  When I read a DataFrame from a table that contains a datetime column 
 it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
 for Spark purposes, but when I go to write this back to the database with 
 df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
 type to SQL Server, which is not a date/time storing type in TSQL. I think 
 it should be writing a datetime, but I’m not sure how to tell Spark this.



 2.  A related misunderstanding happens when I try to write a 
 java.lang.boolean to the database; it errors out because Spark is trying to 
 specify the width of the bit type, which is illegal in SQL Server (error 
 msg: Cannot specify a column width on data type bit). Do I need to edit 
 Spark source to fix this behavior, or is there a configuration option 
 somewhere that I am not aware of?


 When I attempt to write back to SQL Server in an IPython notebook, py4j 
 seems unable to convert a Python dict into a Java hashmap, which is 
 necessary for parameter passing. I’ve documented details of this problem 
 with code examples 
 herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session.
  Any advice would be appreciated.

 Thank you for your time,

 -- Matthew Young

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing

2015-07-20 Thread Nicolas Phung
Hello,

Using the old Spark Streaming Kafka API, I got the following around the
same offset:

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
3561357254, computed crc = 171652633)
at kafka.message.Message.ensureValid(Message.scala:166)
at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I found some old topic about some possible corrupt Kafka message produced
by the new producer API with Snappy compression on. My question is, is it
possible to skip/ignore those offsets when full processing with
KafkaUtils.createStream
or KafkaUtils.createDirectStream ?

Regards,
Nicolas PHUNG

On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote:

 I'd try logging the offsets for each message, see where problems start,
 then try using the console consumer starting at those offsets and see if
 you can reproduce the problem.

 On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com
 wrote:

 Hi Cody,

 Thanks for you help. It seems there's something wrong with some messages
 within my Kafka topics then. I don't understand how, I can get bigger or
 incomplete message since I use default configuration to accept only 1Mb
 message in my Kafka topic. If you have any others informations or
 suggestions, please tell me.

 Regards,
 Nicolas PHUNG

 On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Not exactly the same issue, but possibly related:

 https://issues.apache.org/jira/browse/KAFKA-1196

 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Well, working backwards down the stack trace...

 at java.nio.Buffer.limit(Buffer.java:275)

 That exception gets thrown if the limit is negative or greater than the 
 buffer's capacity


 at kafka.message.Message.sliceDelimited(Message.scala:236)

 If size had been negative, it would have just returned null, so we know
 the exception got thrown because the size was greater than the buffer's
 capacity


 I haven't seen that before... maybe a corrupted message of some kind?

 If that problem is reproducible, try providing an explicit argument for
 messageHandler, with a function that logs the message offset.


 On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung 
 nicolas.ph...@gmail.com wrote:

 Hello,

 When I'm reprocessing the data from kafka (about 40 Gb) with the new 
 Spark Streaming Kafka method createDirectStream, everything is fine till 
 a driver error happened (driver is killed, connection lost...). When the 
 driver pops up again, it resumes the processing with the checkpoint in 
 HDFS. Except, I got this:

 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 
 times; aborting job
 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
 1437032118000 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 
 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 
 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException
   at java.nio.Buffer.limit(Buffer.java:275)
   at kafka.message.Message.sliceDelimited(Message.scala:236)
   at kafka.message.Message.payload(Message.scala:218)
   at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
   at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
   at 
 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
   at 
 

Re: Joda Time best practice?

2015-07-20 Thread Harish Butani
Can you post details on how to reproduce the NPE

On Mon, Jul 20, 2015 at 1:19 PM, algermissen1971 algermissen1...@icloud.com
 wrote:

 Hi Harish,

 On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote:

  Hey Jan,
 
  Can you provide more details on the serialization and cache issues.

 My symptom is that I have a Joda DateTime on which I can call toString and
 getMillis without problems, but when I call getYear I get a NPE out of the
 internal AbstractDateTime. Totally strange but seems to align with issues
 others have.

 I am now changing the app to work with millis internally, as that seems to
 be a performance improvement regarding serialization anyhow.

 Thanks,

 Jan


 
  If you are looking for datetime functionality with spark-sql please
 consider:  https://github.com/SparklineData/spark-datetime It provides a
 simple way to combine joda datetime expressions with spark sql.
 
  regards,
  Harish.
 
  On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 
 algermissen1...@icloud.com wrote:
  Hi,
 
  I am having trouble with Joda Time in a Spark application and saw by now
 that I am not the only one (generally seems to have to do with
 serialization and internal caches of the Joda Time objects).
 
  Is there a known best practice to work around these issues?
 
  Jan
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 




Increment counter variable in RDD transformation function

2015-07-20 Thread dlmarion
 

I'm trying to keep track of some information in a RDD.flatMap() function
(using Java API in 1.4.0). I have two longs in the function, and I am
incrementing them when appropriate, and checking their values to determine
how many objects to output from the function. I'm not trying to read the
values in the driver or use them as a global counter, just trying to count
within the task. This appears not to be working. Should I expect this to
work? If so, any pointers as to what I might be doing wrong? Code looks
something like:

 

JavaRDDLabeledPoint parsedData = data.map(new FunctionString,
LabeledPoint() {

 Long count = 0L;

 public LabeledPoint call(String line) {

   count++;

   String[] parts = line.split(,);

   String[] features = parts[1].split( );

   double[] v = new double[features.length];

   for (int i = 0; i  features.length - 1; i++)

 v[i] = Double.parseDouble(features[i]);

   if (count == 50) {

 //return something else

   }

   return new LabeledPoint(Double.parseDouble(parts[0]),
Vectors.dense(v));

 }

   }

);

 



Re: LDA on a large dataset

2015-07-20 Thread Feynman Liang
LDAOptimizer.scala:421 collects to driver a numTopics by vocabSize matrix
of summary statistics. I suspect that this is what's causing the failure.

One thing you may try doing is decreasing the vocabulary size. One
possibility would be to use a HashingTF if you don't mind dimension
reduction via hashing collisions.

On Mon, Jul 20, 2015 at 3:21 AM, Peter Zvirinsky peter.zvirin...@gmail.com
wrote:

 Hello,

 I'm trying to run LDA on a relatively large dataset (size 100-200 G), but
 with no luck so far.

 At first I made sure that the executors have enough memory with respect to
 the vocabulary size and number of topics.

 After that I ran LDA with default EMLDAOptimizer, but learning failed
 after a few iteration, because the application master ran out of disk. The
 learning job used all space available in the usercache of the application
 master (cca. 100G). I noticed that this implementation uses some sort of
 checkopointing so I made sure it is not used, but it didn't help.

 Afterwards, I tried the OnlineLDAOptimizer, but it started failing at
 reduce at LDAOptimizer.scala:421 with error message: Total size of
 serialized results of X tasks (Y GB) is bigger than
 spark.driver.maxResultSize (Y GB). I kept increasing the
 spark.driver.maxResultSize to tens of GB but it didn't help, just delayed
 this error. I tried to adjust the batch size to very small values so that I
 was sure it must fit into memory, but this didn't help at all.

 Has anyone experience with learning LDA on such a dataset? Maybe some
 ideas what might be wrong?

 I'm using spark 1.4.0 in yarn-client mode. I managed to learn a word2vec
 model on the same dataset with no problems at all.

 Thanks,
 Peter



Re: Silly question about building Spark 1.4.1

2015-07-20 Thread Michael Segel
Thanks Dean… 

I was building based on the information found on the Spark 1.4.1 documentation. 

So I have to ask the following:

Shouldn’t the examples be updated to reflect Hadoop 2.6 or are the vendors’ 
distro not up to 2.6 and that’s why its still showing 2.4? 

Also I’m trying to build with support for Scala 2.11  
Are there any known issues between Scala 2.11 and Hive and hive-thrift server? 

Dean, the reason I asked about needed to specify the Hive and Hive-Thriftserver 
options is that at the end of the build I see the following:
“
[INFO] Spark Project SQL .. SUCCESS [02:06 min]
[INFO] Spark Project ML Library ... SUCCESS [02:23 min]
[INFO] Spark Project Tools  SUCCESS [ 13.305 s]
[INFO] Spark Project Hive . SUCCESS [01:55 min]
[INFO] Spark Project REPL . SUCCESS [ 40.488 s]
[INFO] Spark Project YARN . SUCCESS [ 38.793 s]
[INFO] Spark Project Assembly . SUCCESS [01:10 min]
[INFO] Spark Project External Twitter . SUCCESS [ 14.907 s]
[INFO] Spark Project External Flume Sink .. SUCCESS [ 21.748 s]
[INFO] Spark Project External Flume ... SUCCESS [ 31.754 s]
[INFO] Spark Project External MQTT  SUCCESS [ 17.921 s]
[INFO] Spark Project External ZeroMQ .. SUCCESS [ 18.037 s]
[INFO] Spark Project External Kafka ... SUCCESS [ 41.941 s]
[INFO] Spark Project Examples . SUCCESS [01:56 min]
[INFO] Spark Project External Kafka Assembly .. SUCCESS [ 24.806 s]
[INFO] Spark Project YARN Shuffle Service . SUCCESS [  5.204 s]
[INFO] 
[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 22:40 min
[INFO] Finished at: 2015-07-20T12:54:23-07:00
[INFO] Final Memory: 109M/2332M
[INFO]
“
Granted this may be something completely different which is why the next time I 
do a build, I’m going to capture the stderr/stdout to a file. 

Thx for the quick response. 



 On Jul 20, 2015, at 1:11 PM, Ted Yu yuzhih...@gmail.com wrote:
 
 In master (as well as 1.4.1) I don't see hive profile in pom.xml
 
 I do find hive-provided profile, though.
 
 FYI
 
 On Mon, Jul 20, 2015 at 1:05 PM, Dean Wampler deanwamp...@gmail.com 
 mailto:deanwamp...@gmail.com wrote:
 hadoop-2.6 is supported (look for profile XML in the pom.xml file).
 
 For Hive, add -Phive -Phive-thriftserver  (See 
 http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables 
 http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables) 
 for more details.
 
 dean
 
 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition 
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com/
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com http://polyglotprogramming.com/
 
 On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com 
 mailto:msegel_had...@hotmail.com wrote:
 Sorry, 
 
 Should have sent this to user… 
 
 However… it looks like the docs page may need some editing? 
 
 Thx
 
 -Mike
 
 
 Begin forwarded message:
 
 From: Michael Segel msegel_had...@hotmail.com 
 mailto:msegel_had...@hotmail.com
 Subject: Silly question about building Spark 1.4.1
 Date: July 20, 2015 at 12:26:40 PM MST
 To: d...@spark.apache.org mailto:d...@spark.apache.org
 
 Hi, 
 
 I’m looking at the online docs for building spark 1.4.1 … 
 
 http://spark.apache.org/docs/latest/building-spark.html 
 http://spark.apache.org/docs/latest/building-spark.html 
 
 I was interested in building spark for Scala 2.11 (latest scala) and also 
 for Hive and JDBC support. 
 
 The docs say:
 “
 To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 
 property:
 dev/change-version-to-2.11.sh http://change-version-to-2.11.sh/
 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package
 “ 
 So… 
 Is there a reason I shouldn’t build against hadoop-2.6 ? 
 
 If I want to add the Thirft and Hive support, is it possible? 
 Looking at the Scala build, it looks like hive support is being built? 
 (Looking at the stdout messages…)
 Should the docs be updated? Am I missing something? 
 (Dean W. can confirm, I am completely brain dead. ;-) 
 
 Thx
 
 -Mike
 PS. Yes I can probably download a prebuilt image, but I’m a glutton for 
 punishment. ;-) 
 
 
 
 



Re: Joda Time best practice?

2015-07-20 Thread algermissen1971

On 20 Jul 2015, at 23:20, Harish Butani rhbutani.sp...@gmail.com wrote:

 Can you post details on how to reproduce the NPE

Essentially it is like this:

I have a scala case class that contains a Joda DateTime attribute and instances 
of this class are updated using updateStateByKey. When a certain condition is 
reached the instances are turned to Json (using spray.json) and are stored in 
ElasticSearch. just before creating the JSON I call getYear on the date 
attribute and that fails with NPE.

When insert a getMillis or toString right before the getYear these work just 
fine.

Jan

 
 On Mon, Jul 20, 2015 at 1:19 PM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 Hi Harish,
 
 On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote:
 
  Hey Jan,
 
  Can you provide more details on the serialization and cache issues.
 
 My symptom is that I have a Joda DateTime on which I can call toString and 
 getMillis without problems, but when I call getYear I get a NPE out of the 
 internal AbstractDateTime. Totally strange but seems to align with issues 
 others have.
 
 I am now changing the app to work with millis internally, as that seems to be 
 a performance improvement regarding serialization anyhow.
 
 Thanks,
 
 Jan
 
 
 
  If you are looking for datetime functionality with spark-sql please 
  consider:  https://github.com/SparklineData/spark-datetime It provides a 
  simple way to combine joda datetime expressions with spark sql.
 
  regards,
  Harish.
 
  On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 
  algermissen1...@icloud.com wrote:
  Hi,
 
  I am having trouble with Joda Time in a Spark application and saw by now 
  that I am not the only one (generally seems to have to do with 
  serialization and internal caches of the Joda Time objects).
 
  Is there a known best practice to work around these issues?
 
  Jan
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast variables in R

2015-07-20 Thread Eskilson,Aleksander
Hi Serge,

The broadcast function was made private when SparkR merged into Apache
Spark for the 1.4.0 release. You can still use broadcast by specifying the
private namespace though.

SparkR:::broadcast(sc, obj)

The RDD methods were considered very low-level, and the SparkR devs are
still figuring out which of them they¹d like to expose along with the
higher-level DataFrame API. You can see the rationale for the decision on
the project JIRA [1].

[1] -- https://issues.apache.org/jira/browse/SPARK-7230

Hope that helps,
Alek

On 7/20/15, 12:00 PM, Serge Franchois serge.franch...@altran.com wrote:

I've searched high and low to use broadcast variables in R.
Is is possible at all? I don't see them mentioned in the SparkR API.
Or is there another way of using this feature?

I need to share a large amount of data between executors.
At the moment, I get warned about my task being too large.

I have tried pyspark, and there I can use them.

Wkr,

Serge




--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in
-R-tp23915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dataframes sql order by not total ordering

2015-07-20 Thread Michael Armbrust
An ORDER BY needs to be on the outermost query otherwise subsequent
operations (such as the join) could reorder the tuples.

On Mon, Jul 20, 2015 at 9:25 AM, Carol McDonald cmcdon...@maprtech.com
wrote:

 the following query on the Movielens dataset , is sorting by the count of
 ratings for a movie.  It looks like the results  are ordered  by partition
 ?
 scala val results =sqlContext.sql(select movies.title, movierates.maxr,
 movierates.minr, movierates.cntu from(SELECT ratings.product,
 max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct
 user) as cntu FROM ratings group by ratings.product order by cntu desc)
 movierates join movies on movierates.product=movies.movieId )

 scala results.take(30).foreach(println)
 [Right Stuff, The (1983),5.0,1.0,750]
 [Lost in Space (1998),5.0,1.0,667]
 [Dumb  Dumber (1994),5.0,1.0,660]
 [Patch Adams (1998),5.0,1.0,474]
 [Carlito's Way (1993),5.0,1.0,369]
 [Rounders (1998),5.0,1.0,345]
 [Bedknobs and Broomsticks (1971),5.0,1.0,319]
 [Beverly Hills Ninja (1997),5.0,1.0,232]
 [Saving Grace (2000),5.0,1.0,186]
 [Dangerous Minds (1995),5.0,1.0,141]
 [Death Wish II (1982),5.0,1.0,85]
 [All Dogs Go to Heaven 2 (1996),5.0,1.0,75]
 [Repossessed (1990),4.0,1.0,53]
 [Assignment, The (1997),5.0,1.0,49]
 [$1,000,000 Duck (1971),5.0,1.0,37]
 [Stonewall (1995),5.0,1.0,20]
 [Dog of Flanders, A (1999),5.0,1.0,8]
 [Frogs for Snakes (1998),3.0,1.0,5]
 [It's in the Water (1998),3.0,2.0,3]
 [Twelve Monkeys (1995),5.0,1.0,1511]
 [Ransom (1996),5.0,1.0,564]
 [Alice in Wonderland (1951),5.0,1.0,525]
 [City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392]
 [Eat Drink Man Woman (1994),5.0,1.0,346]
 [Cube (1997),5.0,1.0,233]
 [Omega Man, The (1971),5.0,1.0,224]
 [Stepmom (1998),5.0,1.0,146]
 [Metro (1997),5.0,1.0,100]
 [Death Wish 3 (1985),5.0,1.0,72]
 [Stalker (1979),5.0,1.0,52]



Re: Silly question about building Spark 1.4.1

2015-07-20 Thread Dean Wampler
hadoop-2.6 is supported (look for profile XML in the pom.xml file).

For Hive, add -Phive -Phive-thriftserver  (See
http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables)
for more details.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com
wrote:

 Sorry,

 Should have sent this to user…

 However… it looks like the docs page may need some editing?

 Thx

 -Mike


 Begin forwarded message:

 *From: *Michael Segel msegel_had...@hotmail.com
 *Subject: **Silly question about building Spark 1.4.1*
 *Date: *July 20, 2015 at 12:26:40 PM MST
 *To: *d...@spark.apache.org

 Hi,

 I’m looking at the online docs for building spark 1.4.1 …

 http://spark.apache.org/docs/latest/building-spark.html

 I was interested in building spark for Scala 2.11 (latest scala) and also
 for Hive and JDBC support.

 The docs say:
 “
 To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11
 property:

 dev/change-version-to-2.11.sh
 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

 “

 So…

 Is there a reason I shouldn’t build against hadoop-2.6 ?


 If I want to add the Thirft and Hive support, is it possible?

 Looking at the Scala build, it looks like hive support is being built?

 (Looking at the stdout messages…)

 Should the docs be updated? Am I missing something?

 (Dean W. can confirm, I am completely brain dead. ;-)


 Thx


 -Mike

 PS. Yes I can probably download a prebuilt image, but I’m a glutton for 
 punishment. ;-)






Re: Silly question about building Spark 1.4.1

2015-07-20 Thread Ted Yu
In master (as well as 1.4.1) I don't see hive profile in pom.xml

I do find hive-provided profile, though.

FYI

On Mon, Jul 20, 2015 at 1:05 PM, Dean Wampler deanwamp...@gmail.com wrote:

 hadoop-2.6 is supported (look for profile XML in the pom.xml file).

 For Hive, add -Phive -Phive-thriftserver  (See
 http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables)
 for more details.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com
 wrote:

 Sorry,

 Should have sent this to user…

 However… it looks like the docs page may need some editing?

 Thx

 -Mike


 Begin forwarded message:

 *From: *Michael Segel msegel_had...@hotmail.com
 *Subject: **Silly question about building Spark 1.4.1*
 *Date: *July 20, 2015 at 12:26:40 PM MST
 *To: *d...@spark.apache.org

 Hi,

 I’m looking at the online docs for building spark 1.4.1 …

 http://spark.apache.org/docs/latest/building-spark.html

 I was interested in building spark for Scala 2.11 (latest scala) and also
 for Hive and JDBC support.

 The docs say:
 “
 To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11
 property:

 dev/change-version-to-2.11.sh
 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

 “

 So…

 Is there a reason I shouldn’t build against hadoop-2.6 ?


 If I want to add the Thirft and Hive support, is it possible?

 Looking at the Scala build, it looks like hive support is being built?

 (Looking at the stdout messages…)

 Should the docs be updated? Am I missing something?

 (Dean W. can confirm, I am completely brain dead. ;-)


 Thx


 -Mike

 PS. Yes I can probably download a prebuilt image, but I’m a glutton for 
 punishment. ;-)







Re: Joda Time best practice?

2015-07-20 Thread algermissen1971
Hi Harish,

On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote:

 Hey Jan,
 
 Can you provide more details on the serialization and cache issues.

My symptom is that I have a Joda DateTime on which I can call toString and 
getMillis without problems, but when I call getYear I get a NPE out of the 
internal AbstractDateTime. Totally strange but seems to align with issues 
others have.

I am now changing the app to work with millis internally, as that seems to be a 
performance improvement regarding serialization anyhow.

Thanks,

Jan


 
 If you are looking for datetime functionality with spark-sql please consider: 
  https://github.com/SparklineData/spark-datetime It provides a simple way to 
 combine joda datetime expressions with spark sql. 
 
 regards,
 Harish.
 
 On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com 
 wrote:
 Hi,
 
 I am having trouble with Joda Time in a Spark application and saw by now that 
 I am not the only one (generally seems to have to do with serialization and 
 internal caches of the Joda Time objects).
 
 Is there a known best practice to work around these issues?
 
 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Silly question about building Spark 1.4.1

2015-07-20 Thread Michael Segel
Sorry, 

Should have sent this to user… 

However… it looks like the docs page may need some editing? 

Thx

-Mike


 Begin forwarded message:
 
 From: Michael Segel msegel_had...@hotmail.com
 Subject: Silly question about building Spark 1.4.1
 Date: July 20, 2015 at 12:26:40 PM MST
 To: d...@spark.apache.org
 
 Hi, 
 
 I’m looking at the online docs for building spark 1.4.1 … 
 
 http://spark.apache.org/docs/latest/building-spark.html 
 http://spark.apache.org/docs/latest/building-spark.html 
 
 I was interested in building spark for Scala 2.11 (latest scala) and also for 
 Hive and JDBC support. 
 
 The docs say:
 “
 To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 
 property:
 dev/change-version-to-2.11.sh
 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package
 “ 
 So… 
 Is there a reason I shouldn’t build against hadoop-2.6 ? 
 
 If I want to add the Thirft and Hive support, is it possible? 
 Looking at the Scala build, it looks like hive support is being built? 
 (Looking at the stdout messages…)
 Should the docs be updated? Am I missing something? 
 (Dean W. can confirm, I am completely brain dead. ;-) 
 
 Thx
 
 -Mike
 PS. Yes I can probably download a prebuilt image, but I’m a glutton for 
 punishment. ;-) 
 



Re: Web UI Links

2015-07-20 Thread Bob Corsaro
I figured this out after spelunking the UI code a little. The trick is to
set the SPARK_PUBLIC_DNS environmental variable to the public DNS name of
each server in the cluster, per node. I'm running in standalone mode, so it
was just a matter of adding the setting to spark-env.sh.

On Mon, Jul 20, 2015 at 9:59 AM Bob Corsaro rcors...@gmail.com wrote:

 I'm running a spark cluster and I'd like to access the Spark-UI from
 outside the LAN. The problem is all the links are to internal IP addresses.
 Is there anyway to config hostnames for each of the hosts in the cluster
 and use those for the links?



Re: How to restart Twitter spark stream

2015-07-20 Thread Zoran Jeremic
Thanks for explanation.

If I understand this correctly, in this approach I would actually stream
everything from Twitter, and perform filtering in my application using
Spark. Isn't this too much overhead if my application is interested in
listening for couple of hundreds or thousands hashtags?
On one side, this will be better approach since I will not have the problem
to open new streams if number of hashtags go over 400 which is the Twitter
limit for User stream filtering, but on the other side I'm concern about
how much it will affect application performance if I stream everything that
is posted on Twitter and filter it locally. It would be great if somebody
with experience on this could comment on these concerns.

Thanks,
Zoran

On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Jorn meant something like this:

 val filteredStream = twitterStream.transform(rdd ={

 val newRDD =
 scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1))

 rdd.join(newRDD)

 })

 ​newRDD will work like a filter when you do the join.​


 Thanks
 Best Regards

 On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com
 wrote:

 Hi Jorn,

 I didn't know that it is possible to change filter without re-opening
 twitter stream. Actually, I already had that question earlier at the
 stackoverflow
 http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming
 and I got the answer that it's not possible, but it would be even better if
 there is some other way to add new hashtags or to remove old hashtags that
 user stopped following. I guess the second request would be more difficult.

 However, it would be great if you can give me some short example how to
 make this. I didn't understand well from your explanation what you mean by
 join it with a rdd loading the newest hash tags from disk in a regular
 interval.

 Thanks,
 Zoran

 On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com
 wrote:

 Why do you even want to stop it? You can join it with a rdd loading the
 newest hash tags from disk in a regular interval

 Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a
 écrit :

 Hi,

 I have a twitter spark stream initialized in the following way:

   val ssc:StreamingContext =
 SparkLauncher.getSparkScalaStreamingContext()
   val config = getTwitterConfigurationBuilder.build()
   val auth: Option[twitter4j.auth.Authorization] =
 Some(new
 twitter4j.auth.OAuthAuthorization(config))
   val stream = TwitterUtils.createStream(ssc, auth,
 filters)


 This works fine when I initialy start it. However, at some point I need
 to update filters since users might add new hashtags they want to follow. I
 tried to stop the running stream and spark streaming context without
 stoping spark context, e.g:


stream.stop()
ssc.stop(false)


 Afterward, I'm trying to initialize a new Twitter stream like I did
 previously. However, I got this exception:

 Exception in thread Firestorm JMX Monitor
 java.lang.IllegalStateException: Adding new inputs, transformations, and
 output operations after stopping a context is not supported
 at
 org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
 at
 org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64)
 at
 org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41)
 at
 org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41)
 at
 org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46)
 at
 org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
 at
 org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
 at
 org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
  INFO[2015-07-18 22:24:23,430] [Twitter Stream
 consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl
 (SLF4JLogger.java:83) Inflater has been closed
 ERROR[2015-07-18 22:24:32,503]
 [sparkDriver-akka.actor.default-dispatcher-3]
 streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
 stopping receiver
 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)




 Anybody can explain how to solve 

Re: Spark-hive parquet schema evolution

2015-07-20 Thread Jerrick Hoang
I'm new to Spark, any ideas would be much appreciated! Thanks

On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Hi all,

 I'm aware of the support for schema evolution via DataFrame API. Just
 wondering what would be the best way to go about dealing with schema
 evolution with Hive metastore tables. So, say I create a table via SparkSQL
 CLI, how would I deal with Parquet schema evolution?

 Thanks,
 J



Spark 1.4.1,MySQL and DataFrameReader.read.jdbc fun

2015-07-20 Thread Aaron
I have Spark 1.4.1, running on a YARN cluster.  When I do a pyspark,
in yarn-client mode:

pyspark --jars ~/dev/spark/lib/mysql-connector-java-5.1.36-bin.jar
--driver-class-path
~/dev/spark/lib/mysql-connector-java-5.1.36-bin.jar

and then do the equivalent of..

tbl = sqlContext.read.jdbc(jdbc:mysql://, tableName,
properties={user:blah, password:pw})


I get the No Suitable Driver found error when I attempted to do a

tbl.show()  or maybe a tbl.describe(), etc.  This even happens in the
spark-shell too.


Currently I do NOT use the SPARK_CLASSPATH (as I've seen that talked
about and knowing that it is deprecated).  I also do NOT set the
spark.executor.extraClassPath property because I thought that was the
whole point of --jars option.

So, do I need to deploy the mysql connector to a known location on my
YARN node managers, and then reference that JAR location someplace?
If so, what cmd line options do I use, or properties do I set?

I thought the --jars cmd line option put the JARs into the class path
to be used, is this not the case?

Another question, why do I need --driver-class-path location of mysql
jar?  If I don't use this cmd line option, I get an error just
attempting to do the sqlContext.read.jdbc() assignment..not trying to
perform an operation on the RDD.


Cheers,
Aaron

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Counting distinct values for a key?

2015-07-20 Thread N B
Hi Jerry,

In fact, HashSet approach is what we took earlier. However, this did not
work with a Windowed DStream (i.e. if we provide a forward and inverse
reduce operation). The reason is that the inverse reduce tries to remove
values that may still exist elsewhere in the window and should not have
been removed. We discovered the logic error recently i.e. basically this is
not an invertible function.

I thought of doing this without an inverse reduce function but even over a
15 minute window, this is going to be an expensive operation.

Instead, I have put in place a solution using HashMap where I keep the
actual counts around and in inverse reduce we decrement the counts. At the
end we remove the keys for which the value is 0 and then take the size of
the map (in terms of how many keys it has that have non-zero counts). This
gives us the distinct count.

I was hoping that there is a more straightforward way of doing this within
Spark itself without having to resort to a hack like this.

Thanks
Nikunj



On Sun, Jul 19, 2015 at 6:13 PM, Jerry Lam chiling...@gmail.com wrote:

 Hi Nikunj,

 Sorry, I totally misread your question.
 I think you need to first groupbykey (get all values of the same key
 together), then follow by mapValues (probably put the values into a set and
 then take the size of it because you want a distinct count)

 HTH,

 Jerry

 Sent from my iPhone

 On 19 Jul, 2015, at 8:48 pm, N B nb.nos...@gmail.com wrote:

 Hi Suyog,

 That code outputs the following:

 key2 val22 : 1
 key1 val1 : 2
 key2 val2 : 2

 while the output I want to achieve would have been (with your example):

 key1 : 2
 key2 : 2

 because there are 2 distinct types of values for each key ( regardless of
 their actual duplicate counts .. hence the use of the DISTINCT keyword in
 the query equivalent ).

 Thanks
 Nikunj


 On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari suyogchoudh...@gmail.com
  wrote:

 public static void main(String[] args) {

  SparkConf sparkConf = new SparkConf().setAppName(CountDistinct);

  JavaSparkContext jsc = new JavaSparkContext(sparkConf);

   ListTuple2String, String list = new ArrayListTuple2String,
 String();

   list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key1, val1));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val2));

  list.add(new Tuple2String, String(key2, val22));

 JavaPairRDDString, Integer rdd =  jsc.parallelize(list).mapToPair(t
 - new Tuple2String, Integer(t._1 +   +t._2, 1));

   JavaPairRDDString, Integer rdd2 = rdd.reduceByKey((c1, c2) - c1+c2
 );

 ListTuple2String, Integer output =  rdd2.collect();

   for (Tuple2?,? tuple : output) {

 System.out.println( tuple._1() +  :  + tuple._2() );

 }

   }

 On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote:

 You mean this does not work?

 SELECT key, count(value) from table group by key



 On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:

 Hello,

 How do I go about performing the equivalent of the following SQL clause
 in Spark Streaming? I will be using this on a Windowed DStream.

 SELECT key, count(distinct(value)) from table group by key;

 so for example, given the following dataset in the table:

  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1

 the result will be:

  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1

 Thanks
 Nikunj







Is SPARK is the right choice for traditional OLAP query processing?

2015-07-20 Thread renga.kannan
All,
I really appreciate anyone's input on this. We are having a very simple
traditional OLAP query processing use case. Our use case is as follows.


1. We have a customer sales order table data coming from RDBMs table. 
2. There are many dimension columns in the sales order table. For each of
those dimensions, we have individual dimension tables that stores the
dimension record sets.
3. We also have some BI like hierarchies that is defined for dimension data
set.

What we want for business users is as follows.?

1. We wanted to show some aggregated values from sales Order transaction
table columns.
2. User would like to filter these with specific dimension values from
dimension table.
3. User should be able to drill down from higher level to lower level by
traversing hierarchy on dimension


We want these use actions respond within 2 to 5 seconds.


We are thinking about using SPARK as our backend enginee to sever data to
these front end application.


Has anyone tried using SPARK for these kind of use cases. These are all
traditional use cases in BI space. If so, can SPARK respond to these queries
with in 2 to 5 seconds for large data sets.

Thanks,
Renga



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-SPARK-is-the-right-choice-for-traditional-OLAP-query-processing-tp23921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Kmeans Labeled Point RDD

2015-07-20 Thread Mohammed Guller
I responded to your question on SO. Let me know if this what you wanted. 

http://stackoverflow.com/a/31528274/2336943


Mohammed

-Original Message-
From: plazaster [mailto:michaelplaz...@gmail.com] 
Sent: Sunday, July 19, 2015 11:38 PM
To: user@spark.apache.org
Subject: Re: Kmeans Labeled Point RDD

Has there been any progress on this, I am in the same boat.

I posted a similar question to Stack Exchange.

http://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989p23907.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Data frames select and where clause dependency

2015-07-20 Thread Mohammed Guller
Michael,
How would the Catalyst optimizer optimize this version?
df.filter(df(filter_field) === value).select(field1).show()
Would it still read all the columns in df or would it read only “filter_field” 
and “field1” since only two columns are used (assuming other columns from df 
are not used anywhere else)?

Mohammed

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Friday, July 17, 2015 1:39 PM
To: Mike Trienis
Cc: user@spark.apache.org
Subject: Re: Data frames select and where clause dependency

Each operation on a dataframe is completely independent and doesn't know what 
operations happened before it.  When you do a selection, you are removing other 
columns from the dataframe and so the filter has nothing to operate on.

On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis 
mike.trie...@orcsol.commailto:mike.trie...@orcsol.com wrote:
I'd like to understand why the where field must exist in the select clause.

For example, the following select statement works fine

  *   df.select(field1, filter_field).filter(df(filter_field) === 
value).show()
However, the next one fails with the error in operator !Filter 
(filter_field#60 = value);

  *   df.select(field1).filter(df(filter_field) === value).show()
As a work-around, it seems that I can do the following

  *   df.select(field1, filter_field).filter(df(filter_field) === 
value).drop(filter_field).show()

Thanks, Mike.



Re: Data frames select and where clause dependency

2015-07-20 Thread Harish Butani
Yes via:  org.apache.spark.sql.catalyst.optimizer.ColumnPruning
See DefaultOptimizer.batches for list of logical rewrites.

You can see the optimized plan by printing: df.queryExecution.optimizedPlan

On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Michael,

 How would the Catalyst optimizer optimize this version?

 df.filter(df(filter_field) === value).select(field1).show()

 Would it still read all the columns in df or would it read only
 “filter_field” and “field1” since only two columns are used (assuming other
 columns from df are not used anywhere else)?



 Mohammed



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Friday, July 17, 2015 1:39 PM
 *To:* Mike Trienis
 *Cc:* user@spark.apache.org
 *Subject:* Re: Data frames select and where clause dependency



 Each operation on a dataframe is completely independent and doesn't know
 what operations happened before it.  When you do a selection, you are
 removing other columns from the dataframe and so the filter has nothing to
 operate on.



 On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 I'd like to understand why the where field must exist in the select
 clause.



 For example, the following select statement works fine

- df.select(field1, filter_field).filter(df(filter_field) ===
value).show()

  However, the next one fails with the error in operator !Filter
 (filter_field#60 = value);

- df.select(field1).filter(df(filter_field) === value).show()

  As a work-around, it seems that I can do the following

- df.select(field1, filter_field).filter(df(filter_field) ===
value).drop(filter_field).show()



 Thanks, Mike.





RE: Data frames select and where clause dependency

2015-07-20 Thread Mohammed Guller
Thanks, Harish.

Mike – this would be a cleaner version for your use case:
df.filter(df(filter_field) === value).select(field1).show()

Mohammed

From: Harish Butani [mailto:rhbutani.sp...@gmail.com]
Sent: Monday, July 20, 2015 5:37 PM
To: Mohammed Guller
Cc: Michael Armbrust; Mike Trienis; user@spark.apache.org
Subject: Re: Data frames select and where clause dependency

Yes via:  org.apache.spark.sql.catalyst.optimizer.ColumnPruning
See DefaultOptimizer.batches for list of logical rewrites.

You can see the optimized plan by printing: df.queryExecution.optimizedPlan

On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller 
moham...@glassbeam.commailto:moham...@glassbeam.com wrote:
Michael,
How would the Catalyst optimizer optimize this version?
df.filter(df(filter_field) === value).select(field1).show()
Would it still read all the columns in df or would it read only “filter_field” 
and “field1” since only two columns are used (assuming other columns from df 
are not used anywhere else)?

Mohammed

From: Michael Armbrust 
[mailto:mich...@databricks.commailto:mich...@databricks.com]
Sent: Friday, July 17, 2015 1:39 PM
To: Mike Trienis
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Data frames select and where clause dependency

Each operation on a dataframe is completely independent and doesn't know what 
operations happened before it.  When you do a selection, you are removing other 
columns from the dataframe and so the filter has nothing to operate on.

On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis 
mike.trie...@orcsol.commailto:mike.trie...@orcsol.com wrote:
I'd like to understand why the where field must exist in the select clause.

For example, the following select statement works fine

  *   df.select(field1, filter_field).filter(df(filter_field) === 
value).show()
However, the next one fails with the error in operator !Filter 
(filter_field#60 = value);

  *   df.select(field1).filter(df(filter_field) === value).show()
As a work-around, it seems that I can do the following

  *   df.select(field1, filter_field).filter(df(filter_field) === 
value).drop(filter_field).show()

Thanks, Mike.




Re: Data frames select and where clause dependency

2015-07-20 Thread Mike Trienis
Definitely, thanks Mohammed.

On Mon, Jul 20, 2015 at 5:47 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Thanks, Harish.



 Mike – this would be a cleaner version for your use case:

 df.filter(df(filter_field) === value).select(field1).show()



 Mohammed



 *From:* Harish Butani [mailto:rhbutani.sp...@gmail.com]
 *Sent:* Monday, July 20, 2015 5:37 PM
 *To:* Mohammed Guller
 *Cc:* Michael Armbrust; Mike Trienis; user@spark.apache.org

 *Subject:* Re: Data frames select and where clause dependency



 Yes via:  org.apache.spark.sql.catalyst.optimizer.ColumnPruning

 See DefaultOptimizer.batches for list of logical rewrites.



 You can see the optimized plan by printing: df.queryExecution.optimizedPlan



 On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Michael,

 How would the Catalyst optimizer optimize this version?

 df.filter(df(filter_field) === value).select(field1).show()

 Would it still read all the columns in df or would it read only
 “filter_field” and “field1” since only two columns are used (assuming other
 columns from df are not used anywhere else)?



 Mohammed



 *From:* Michael Armbrust [mailto:mich...@databricks.com]
 *Sent:* Friday, July 17, 2015 1:39 PM
 *To:* Mike Trienis
 *Cc:* user@spark.apache.org
 *Subject:* Re: Data frames select and where clause dependency



 Each operation on a dataframe is completely independent and doesn't know
 what operations happened before it.  When you do a selection, you are
 removing other columns from the dataframe and so the filter has nothing to
 operate on.



 On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com
 wrote:

 I'd like to understand why the where field must exist in the select
 clause.



 For example, the following select statement works fine

- df.select(field1, filter_field).filter(df(filter_field) ===
value).show()

  However, the next one fails with the error in operator !Filter
 (filter_field#60 = value);

- df.select(field1).filter(df(filter_field) === value).show()

  As a work-around, it seems that I can do the following

- df.select(field1, filter_field).filter(df(filter_field) ===
value).drop(filter_field).show()



 Thanks, Mike.







Re: Increment counter variable in RDD transformation function

2015-07-20 Thread Ted Yu
Please see
http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes

Cheers

On Mon, Jul 20, 2015 at 3:21 PM, dlmar...@comcast.net wrote:



 I’m trying to keep track of some information in a RDD.flatMap() function
 (using Java API in 1.4.0). I have two longs in the function, and I am
 incrementing them when appropriate, and checking their values to determine
 how many objects to output from the function. I’m not trying to read the
 values in the driver or use them as a global counter, just trying to count
 within the task. This appears not to be working. Should I expect this to
 work? If so, any pointers as to what I might be doing wrong? Code looks
 something like:



 JavaRDDLabeledPoint parsedData = data.map(new FunctionString,
 LabeledPoint() {

  Long count = 0L;

  public LabeledPoint call(String line) {

count++;

String[] parts = line.split(,);

String[] features = parts[1].split( );

double[] v = new double[features.length];

for (int i = 0; i  features.length - 1; i++)

  v[i] = Double.parseDouble(features[i]);

if (count == 50) {

  //return something else

}

return new LabeledPoint(Double.parseDouble(parts[0]),
 Vectors.dense(v));

  }

}

 );





spark streaming 1.3 coalesce on kafkadirectstream

2015-07-20 Thread Shushant Arora
does spark streaming 1.3 launches task for each partition offset range
whether that is 0 or not ?

If yes, how can I enforce it to not to launch tasks for empty rdds.Not able
t o use coalesce on directKafkaStream.

Shall we enforce repartitioning always before processing direct stream ?

use case is :

directKafkaStream.repartition(numexecutors).mapPartitions(new
FlatMapFunctionIteratorTuple2byte[],byte[], String(){
...
}

Thanks


standalone to connect mysql

2015-07-20 Thread Jack Yang
Hi there,

I would like to use spark to access the data in mysql. So firstly  I tried to 
run the program using:
spark-submit --class sparkwithscala.SqlApp --driver-class-path 
/home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar

that returns me the correct results. Then I tried the standalone version using:
spark-submit --class sparkwithscala.SqlApp --driver-class-path 
/home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 
/home/myjar.jar
(the mysql-connector-java-5.1.34.jar i have them on all worker nodes.)
and the error is:

Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No 
suitable driver found for 
jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root

I also found the similar problem before in 
https://jira.talendforge.org/browse/TBD-2244.

Is this a bug to be fixed later? Or do I miss anything?



Best regards,
Jack