Re: How to save a string to a text file ?

2015-08-14 Thread go canal
thank you very much. just a quick question - I try to save string in this way 
but the file is always empty:
    val file = Path ("sample data/ZN_SPARK.OUT").createFile(true)    
file.bufferedWriter().write(im.toString())    file.bufferedWriter().flush()    
file.bufferedWriter().close()
anything wrong ? thanks, canal 


 On Saturday, August 15, 2015 11:21 AM, Brandon White 
 wrote:
   

 Convert it to a rdd then save the rdd to a file
val str = "dank memes"sc.parallelize(List(str)).saveAsTextFile("str.txt")
On Fri, Aug 14, 2015 at 7:50 PM, go canal  wrote:

Hello again,online resources have sample code for writing RDD to a file, but I 
have a simple string, how to save to a text file ? (my data is a DenseMatrix 
actually)
appreciate any help ! thanks, canal



  

Re: How to save a string to a text file ?

2015-08-14 Thread Brandon White
Convert it to a rdd then save the rdd to a file

val str = "dank memes"
sc.parallelize(List(str)).saveAsTextFile("str.txt")

On Fri, Aug 14, 2015 at 7:50 PM, go canal  wrote:

> Hello again,
> online resources have sample code for writing RDD to a file, but I have a
> simple string, how to save to a text file ? (my data is a DenseMatrix
> actually)
>
> appreciate any help !
>
> thanks, canal
>


How to save a string to a text file ?

2015-08-14 Thread go canal
Hello again,online resources have sample code for writing RDD to a file, but I 
have a simple string, how to save to a text file ? (my data is a DenseMatrix 
actually)
appreciate any help ! thanks, canal

Re: Left outer joining big data set with small lookups

2015-08-14 Thread Raghavendra Pandey
In spark 1.4 there is a parameter to control that. Its default value is 10
M. So you need to cache your dataframe to hint the size.
On Aug 14, 2015 7:09 PM, "VIJAYAKUMAR JAWAHARLAL" 
wrote:

> Hi
>
> I am facing huge performance problem when I am trying to left outer join
> very big data set (~140GB) with bunch of small lookups [Start schema type].
> I am using data frame  in spark sql. It looks like data is shuffled and
> skewed when that join happens. Is there any way to improve performance of
> such type of join in spark?
>
> How can I hint optimizer to go with replicated join etc., to avoid
> shuffle? Would it help to create broadcast variables on small lookups?  If
> I create broadcast variables, how can I convert them into data frame and
> use them in sparksql type of join?
>
> Thanks
> Vijay
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Thanks, Cody. It sounds like Spark Streaming has enough state info to know
how many batches have been processed and if not all of them then the RDD is
'unfinished'. I wonder if it would know whether the last micro-batch has
been fully processed successfully. Hypothetically, the driver program could
terminate as the last batch is being processed...

On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger  wrote:

> You'll resume and re-process the rdd that didnt finish
>
> On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Our additional question on checkpointing is basically the logistics of it
>> --
>>
>> At which point does the data get written into checkpointing?  Is it
>> written as soon as the driver program retrieves an RDD from Kafka (or
>> another source)?  Or, is it written after that RDD has been processed and
>> we're basically moving on to the next RDD?
>>
>> What I'm driving at is, what happens if the driver program is killed?
>> The next time it's started, will it know, from Spark Streaming's
>> checkpointing, to resume from the same RDD that was being processed at the
>> time of the program getting killed?  In other words, will we, upon
>> restarting the consumer, resume from the RDD that was unfinished, or will
>> we be looking at the next RDD?
>>
>> Will we pick up from the last known *successfully processed* topic
>> offset?
>>
>> Thanks.
>>
>>
>>
>>
>> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:
>>
>>> If you've set the checkpoint dir, it seems like indeed the intent is
>>> to use a default checkpoint interval in DStream:
>>>
>>> private[streaming] def initialize(time: Time) {
>>> ...
>>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>>> which ever is larger
>>>   if (mustCheckpoint && checkpointDuration == null) {
>>> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>>> slideDuration).toInt
>>> logInfo("Checkpoint interval automatically set to " +
>>> checkpointDuration)
>>>   }
>>>
>>> Do you see that log message? what's the interval? that could at least
>>> explain why it's not doing anything, if it's quite long.
>>>
>>> It sort of seems wrong though since
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>> suggests it was intended to be a multiple of the batch interval. The
>>> slide duration wouldn't always be relevant anyway.
>>>
>>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>>>  wrote:
>>> > I've instrumented checkpointing per the programming guide and I can
>>> tell
>>> > that Spark Streaming is creating the checkpoint directories but I'm not
>>> > seeing any content being created in those directories nor am I seeing
>>> the
>>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>>> into
>>> > Kafka while the consumers are down, to get picked up when the
>>> consumers are
>>> > restarted; I'm not seeing that.
>>> >
>>> > For now my checkpoint directory is set to the local file system with
>>> the
>>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>>> > subdirectory named with a UUID being created under there but no files.
>>> >
>>> > I'm using a custom JavaStreamingContextFactory which creates a
>>> > JavaStreamingContext with the directory set into it via the
>>> > checkpoint(String) method.
>>> >
>>> > I'm currently not invoking the checkpoint(Duration) method on the
>>> DStream
>>> > since I want to first rely on Spark's default checkpointing interval.
>>> My
>>> > streaming batch duration millis is set to 1 second.
>>> >
>>> > Anyone have any idea what might be going wrong?
>>> >
>>> > Also, at which point does Spark delete files from checkpointing?
>>> >
>>> > Thanks.
>>>
>>
>>
>


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
Another fix might be to remove the exception that is thrown when windowing
and other stateful operations are used without checkpointing.

On Fri, Aug 14, 2015 at 5:43 PM, Asim Jalis  wrote:

> I feel the real fix here is to remove the exception from QueueInputDStream
> class by reverting the fix of
> https://issues.apache.org/jira/browse/SPARK-8630
>
> I can write another class that is identical to the QueueInputDStream class
> except it does not throw the exception. But this feels like a convoluted
> solution.
>
> Throwing exceptions to forbid behavior in code is risky because it can
> easily break legitimate uses of a class.
>
> Is there a way to reopen https://issues.apache.org/jira/browse/SPARK-8630.
> I have added a comment to it, but I am not sure if that will have that
> effect.
>
> Thanks.
>
> Asim
>
> On Fri, Aug 14, 2015 at 4:03 PM, Holden Karau 
> wrote:
>
>> I just pushed some code that does this for spark-testing-base (
>> https://github.com/holdenk/spark-testing-base )  (its in master) and
>> will publish an updated artifact with it for tonight.
>>
>> On Fri, Aug 14, 2015 at 3:35 PM, Tathagata Das 
>> wrote:
>>
>>> A hacky workaround is to create a customer InputDStream that creates the
>>> right RDDs based on a function. The TestInputDStream
>>> 
>>> does something similar for Spark Streaming unit tests.
>>>
>>> TD
>>>
>>> On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:
>>>
 I want to test some Spark Streaming code that is using
 reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:

 java.lang.IllegalArgumentException: requirement failed: The checkpoint
> directory has not been set. Please set it by 
> StreamingContext.checkpoint().


 But if I enable checkpointing I get

 queueStream doesn't support checkpointing


 Is there a workaround for this?

 My goal is to test that the windowing logic in my code is correct. Is
 there a way to disable these strict checks or a different dstream I can use
 that I can populate programmatically and then use for testing?

 Thanks.

 Asim


>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>> Linked In: https://www.linkedin.com/in/holdenkarau
>>
>
>


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
I feel the real fix here is to remove the exception from QueueInputDStream
class by reverting the fix of
https://issues.apache.org/jira/browse/SPARK-8630

I can write another class that is identical to the QueueInputDStream class
except it does not throw the exception. But this feels like a convoluted
solution.

Throwing exceptions to forbid behavior in code is risky because it can
easily break legitimate uses of a class.

Is there a way to reopen https://issues.apache.org/jira/browse/SPARK-8630.
I have added a comment to it, but I am not sure if that will have that
effect.

Thanks.

Asim

On Fri, Aug 14, 2015 at 4:03 PM, Holden Karau  wrote:

> I just pushed some code that does this for spark-testing-base (
> https://github.com/holdenk/spark-testing-base )  (its in master) and will
> publish an updated artifact with it for tonight.
>
> On Fri, Aug 14, 2015 at 3:35 PM, Tathagata Das 
> wrote:
>
>> A hacky workaround is to create a customer InputDStream that creates the
>> right RDDs based on a function. The TestInputDStream
>> 
>> does something similar for Spark Streaming unit tests.
>>
>> TD
>>
>> On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:
>>
>>> I want to test some Spark Streaming code that is using
>>> reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:
>>>
>>> java.lang.IllegalArgumentException: requirement failed: The checkpoint
 directory has not been set. Please set it by StreamingContext.checkpoint().
>>>
>>>
>>> But if I enable checkpointing I get
>>>
>>> queueStream doesn't support checkpointing
>>>
>>>
>>> Is there a workaround for this?
>>>
>>> My goal is to test that the windowing logic in my code is correct. Is
>>> there a way to disable these strict checks or a different dstream I can use
>>> that I can populate programmatically and then use for testing?
>>>
>>> Thanks.
>>>
>>> Asim
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>


Re: Spark RuntimeException hadoop output format

2015-08-14 Thread Ted Yu
First you create the file:

final File outputFile = new File(outputPath);

Then you write to it:
Files.append(counts + "\n", outputFile, Charset.defaultCharset());

Cheers

On Fri, Aug 14, 2015 at 4:38 PM, Mohit Anchlia 
wrote:

> I thought prefix meant the output path? What's the purpose of prefix and
> where do I specify the path if not in prefix?
>
> On Fri, Aug 14, 2015 at 4:36 PM, Ted Yu  wrote:
>
>> Please take a look at JavaPairDStream.scala:
>>  def saveAsHadoopFiles[F <: OutputFormat[_, _]](
>>   prefix: String,
>>   suffix: String,
>>   keyClass: Class[_],
>>   valueClass: Class[_],
>>   outputFormatClass: Class[F]) {
>>
>> Did you intend to use outputPath as prefix ?
>>
>> Cheers
>>
>>
>> On Fri, Aug 14, 2015 at 1:36 PM, Mohit Anchlia 
>> wrote:
>>
>>> Spark 1.3
>>>
>>> Code:
>>>
>>> wordCounts.foreachRDD(*new* *Function2,
>>> Time, Void>()* {
>>>
>>> @Override
>>>
>>> *public* Void call(JavaPairRDD rdd, Time time) *throws*
>>> IOException {
>>>
>>> String counts = "Counts at time " + time + " " + rdd.collect();
>>>
>>> System.*out*.println(counts);
>>>
>>> System.*out*.println("Appending to " + outputFile.getAbsolutePath());
>>>
>>> Files.*append*(counts + "\n", outputFile, Charset.*defaultCharset*());
>>>
>>> *return* *null*;
>>>
>>> }
>>>
>>> });
>>>
>>> wordCounts.saveAsHadoopFiles(outputPath, "txt", Text.*class*, Text.
>>> *class*, TextOutputFormat.*class*);
>>>
>>>
>>> What do I need to check in namenode? I see 0 bytes files like this:
>>>
>>>
>>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>>> /tmp/out-1439495124000.txt
>>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>>> /tmp/out-1439495125000.txt
>>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>>> /tmp/out-1439495126000.txt
>>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>>> /tmp/out-1439495127000.txt
>>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>>> /tmp/out-1439495128000.txt
>>>
>>>
>>>
>>> However, I also wrote data to a local file on the local file system for
>>> verification and I see the data:
>>>
>>>
>>> $ ls -ltr !$
>>> ls -ltr /tmp/out
>>> -rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out
>>>
>>>
>>> On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu  wrote:
>>>
 Which Spark release are you using ?

 Can you show us snippet of your code ?

 Have you checked namenode log ?

 Thanks



 On Aug 13, 2015, at 10:21 PM, Mohit Anchlia 
 wrote:

 I was able to get this working by using an alternative method however I
 only see 0 bytes files in hadoop. I've verified that the output does exist
 in the logs however it's missing from hdfs.

 On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia >>> > wrote:

> I have this call trying to save to hdfs 2.6
>
> wordCounts.saveAsNewAPIHadoopFiles("prefix", "txt");
>
> but I am getting the following:
> java.lang.RuntimeException: class scala.runtime.Nothing$ not
> org.apache.hadoop.mapreduce.OutputFormat
>


>>>
>>
>


Too many files/dirs in hdfs

2015-08-14 Thread Mohit Anchlia
Spark stream seems to be creating 0 bytes files even when there is no data.
Also, I have 2 concerns here:

1) Extra unnecessary files is being created from the output
2) Hadoop doesn't work really well with too many files and I see that it is
creating a directory with a timestamp every 1 second. Is there a better way
of writing a file, may be use some kind of append mechanism where one
doesn't have to change the batch interval.


Executors on multiple nodes

2015-08-14 Thread Mohit Anchlia
I am running on Yarn and do have a question on how spark runs executors on
different data nodes. Is that primarily decided based on number of
receivers?

What do I need to do to ensure that multiple nodes are being used for data
processing?


Re: Spark RuntimeException hadoop output format

2015-08-14 Thread Mohit Anchlia
I thought prefix meant the output path? What's the purpose of prefix and
where do I specify the path if not in prefix?

On Fri, Aug 14, 2015 at 4:36 PM, Ted Yu  wrote:

> Please take a look at JavaPairDStream.scala:
>  def saveAsHadoopFiles[F <: OutputFormat[_, _]](
>   prefix: String,
>   suffix: String,
>   keyClass: Class[_],
>   valueClass: Class[_],
>   outputFormatClass: Class[F]) {
>
> Did you intend to use outputPath as prefix ?
>
> Cheers
>
>
> On Fri, Aug 14, 2015 at 1:36 PM, Mohit Anchlia 
> wrote:
>
>> Spark 1.3
>>
>> Code:
>>
>> wordCounts.foreachRDD(*new* *Function2,
>> Time, Void>()* {
>>
>> @Override
>>
>> *public* Void call(JavaPairRDD rdd, Time time) *throws*
>> IOException {
>>
>> String counts = "Counts at time " + time + " " + rdd.collect();
>>
>> System.*out*.println(counts);
>>
>> System.*out*.println("Appending to " + outputFile.getAbsolutePath());
>>
>> Files.*append*(counts + "\n", outputFile, Charset.*defaultCharset*());
>>
>> *return* *null*;
>>
>> }
>>
>> });
>>
>> wordCounts.saveAsHadoopFiles(outputPath, "txt", Text.*class*, Text.
>> *class*, TextOutputFormat.*class*);
>>
>>
>> What do I need to check in namenode? I see 0 bytes files like this:
>>
>>
>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>> /tmp/out-1439495124000.txt
>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>> /tmp/out-1439495125000.txt
>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>> /tmp/out-1439495126000.txt
>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>> /tmp/out-1439495127000.txt
>> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
>> /tmp/out-1439495128000.txt
>>
>>
>>
>> However, I also wrote data to a local file on the local file system for
>> verification and I see the data:
>>
>>
>> $ ls -ltr !$
>> ls -ltr /tmp/out
>> -rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out
>>
>>
>> On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu  wrote:
>>
>>> Which Spark release are you using ?
>>>
>>> Can you show us snippet of your code ?
>>>
>>> Have you checked namenode log ?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Aug 13, 2015, at 10:21 PM, Mohit Anchlia 
>>> wrote:
>>>
>>> I was able to get this working by using an alternative method however I
>>> only see 0 bytes files in hadoop. I've verified that the output does exist
>>> in the logs however it's missing from hdfs.
>>>
>>> On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia 
>>> wrote:
>>>
 I have this call trying to save to hdfs 2.6

 wordCounts.saveAsNewAPIHadoopFiles("prefix", "txt");

 but I am getting the following:
 java.lang.RuntimeException: class scala.runtime.Nothing$ not
 org.apache.hadoop.mapreduce.OutputFormat

>>>
>>>
>>
>


Re: Spark RuntimeException hadoop output format

2015-08-14 Thread Ted Yu
Please take a look at JavaPairDStream.scala:
 def saveAsHadoopFiles[F <: OutputFormat[_, _]](
  prefix: String,
  suffix: String,
  keyClass: Class[_],
  valueClass: Class[_],
  outputFormatClass: Class[F]) {

Did you intend to use outputPath as prefix ?

Cheers


On Fri, Aug 14, 2015 at 1:36 PM, Mohit Anchlia 
wrote:

> Spark 1.3
>
> Code:
>
> wordCounts.foreachRDD(*new* *Function2,
> Time, Void>()* {
>
> @Override
>
> *public* Void call(JavaPairRDD rdd, Time time) *throws*
> IOException {
>
> String counts = "Counts at time " + time + " " + rdd.collect();
>
> System.*out*.println(counts);
>
> System.*out*.println("Appending to " + outputFile.getAbsolutePath());
>
> Files.*append*(counts + "\n", outputFile, Charset.*defaultCharset*());
>
> *return* *null*;
>
> }
>
> });
>
> wordCounts.saveAsHadoopFiles(outputPath, "txt", Text.*class*, Text.*class*,
> TextOutputFormat.*class*);
>
>
> What do I need to check in namenode? I see 0 bytes files like this:
>
>
> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
> /tmp/out-1439495124000.txt
> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
> /tmp/out-1439495125000.txt
> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
> /tmp/out-1439495126000.txt
> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
> /tmp/out-1439495127000.txt
> drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
> /tmp/out-1439495128000.txt
>
>
>
> However, I also wrote data to a local file on the local file system for
> verification and I see the data:
>
>
> $ ls -ltr !$
> ls -ltr /tmp/out
> -rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out
>
>
> On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu  wrote:
>
>> Which Spark release are you using ?
>>
>> Can you show us snippet of your code ?
>>
>> Have you checked namenode log ?
>>
>> Thanks
>>
>>
>>
>> On Aug 13, 2015, at 10:21 PM, Mohit Anchlia 
>> wrote:
>>
>> I was able to get this working by using an alternative method however I
>> only see 0 bytes files in hadoop. I've verified that the output does exist
>> in the logs however it's missing from hdfs.
>>
>> On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia 
>> wrote:
>>
>>> I have this call trying to save to hdfs 2.6
>>>
>>> wordCounts.saveAsNewAPIHadoopFiles("prefix", "txt");
>>>
>>> but I am getting the following:
>>> java.lang.RuntimeException: class scala.runtime.Nothing$ not
>>> org.apache.hadoop.mapreduce.OutputFormat
>>>
>>
>>
>


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Holden Karau
I just pushed some code that does this for spark-testing-base (
https://github.com/holdenk/spark-testing-base )  (its in master) and will
publish an updated artifact with it for tonight.

On Fri, Aug 14, 2015 at 3:35 PM, Tathagata Das  wrote:

> A hacky workaround is to create a customer InputDStream that creates the
> right RDDs based on a function. The TestInputDStream
> 
> does something similar for Spark Streaming unit tests.
>
> TD
>
> On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:
>
>> I want to test some Spark Streaming code that is using
>> reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:
>>
>> java.lang.IllegalArgumentException: requirement failed: The checkpoint
>>> directory has not been set. Please set it by StreamingContext.checkpoint().
>>
>>
>> But if I enable checkpointing I get
>>
>> queueStream doesn't support checkpointing
>>
>>
>> Is there a workaround for this?
>>
>> My goal is to test that the windowing logic in my code is correct. Is
>> there a way to disable these strict checks or a different dstream I can use
>> that I can populate programmatically and then use for testing?
>>
>> Thanks.
>>
>> Asim
>>
>>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


Re: QueueStream Does Not Support Checkpointing

2015-08-14 Thread Tathagata Das
A hacky workaround is to create a customer InputDStream that creates the
right RDDs based on a function. The TestInputDStream

does something similar for Spark Streaming unit tests.

TD

On Fri, Aug 14, 2015 at 1:04 PM, Asim Jalis  wrote:

> I want to test some Spark Streaming code that is using
> reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:
>
> java.lang.IllegalArgumentException: requirement failed: The checkpoint
>> directory has not been set. Please set it by StreamingContext.checkpoint().
>
>
> But if I enable checkpointing I get
>
> queueStream doesn't support checkpointing
>
>
> Is there a workaround for this?
>
> My goal is to test that the windowing logic in my code is correct. Is
> there a way to disable these strict checks or a different dstream I can use
> that I can populate programmatically and then use for testing?
>
> Thanks.
>
> Asim
>
>


Re: worker and executor memory

2015-08-14 Thread James Pirz
Additional Comment:
I checked the disk usage on the 3 nodes (using iostat) and it seems that
reading from HDFS partitions happen in a node-by-node basis. Only one of
the nodes shows active IO (as read) at any given time while the other two
nodes are idle IO-wise. I am not sure why the tasks are scheduled that way,
as it is a map-only job and reading can happen in parallel.

On Thu, Aug 13, 2015 at 9:10 PM, James Pirz  wrote:

> Hi,
>
> I am using Spark 1.4 on a cluster (stand-alone mode), across 3 machines,
> for a workload similar to TPCH (analytical queries with multiple/multi-way
> large joins and aggregations). Each machine has 12GB of Memory and 4 cores.
> My total data size is 150GB, stored in HDFS (stored as Hive tables), and I
> am running my queries through Spark SQL using hive context.
> After checking the performance tuning documents on the spark page and some
> clips from latest spark summit, I decided to set the following configs in
> my spark-env:
>
> SPARK_WORKER_INSTANCES=4
> SPARK_WORKER_CORES=1
> SPARK_WORKER_MEMORY=2500M
>
> (As my tasks tend to be long so the overhead of starting multiple JVMs,
> one per worker is much less than the total query times). As I monitor the
> job progress, I realized that while the Worker memory is 2.5GB, the
> executors (one per worker) have max memory of 512MB (which is default). I
> enlarged this value in my application as:
>
> conf.set("spark.executor.memory", "2.5g");
>
> Trying to give max available memory on each worker to its only executor,
> but I observed that my queries are running slower than the prev case
> (default 512MB). Changing 2.5g to 1g improved the performance time, it is
> close to but still worse than 512MB case. I guess what I am missing here is
> what is the relationship between the "WORKER_MEMORY" and 'executor.memory'.
>
> - Isn't it the case that WORKER tries to split this memory among its
> executors (in my case its only executor) ? Or there are other stuff being
> done worker which need memory ?
>
> - What other important parameters I need to look into and tune at this
> point to get the best response time out of my HW ? (I have read about Kryo
> serializer, and I am about trying that - I am mainly concerned about memory
> related settings and also knobs related to parallelism of my jobs). As an
> example, for a simple scan-only query, Spark is worse than Hive (almost 3
> times slower) while both are scanning the exact same table & file format.
> That is why I believe I am missing some params by leaving them as defaults.
>
> Any hint/suggestion would be highly appreciated.
>
>
>


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Cody Koeninger
You'll resume and re-process the rdd that didnt finish

On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg  wrote:

> Our additional question on checkpointing is basically the logistics of it
> --
>
> At which point does the data get written into checkpointing?  Is it
> written as soon as the driver program retrieves an RDD from Kafka (or
> another source)?  Or, is it written after that RDD has been processed and
> we're basically moving on to the next RDD?
>
> What I'm driving at is, what happens if the driver program is killed?  The
> next time it's started, will it know, from Spark Streaming's checkpointing,
> to resume from the same RDD that was being processed at the time of the
> program getting killed?  In other words, will we, upon restarting the
> consumer, resume from the RDD that was unfinished, or will we be looking at
> the next RDD?
>
> Will we pick up from the last known *successfully processed* topic offset?
>
> Thanks.
>
>
>
>
> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:
>
>> If you've set the checkpoint dir, it seems like indeed the intent is
>> to use a default checkpoint interval in DStream:
>>
>> private[streaming] def initialize(time: Time) {
>> ...
>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>> which ever is larger
>>   if (mustCheckpoint && checkpointDuration == null) {
>> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>> slideDuration).toInt
>> logInfo("Checkpoint interval automatically set to " +
>> checkpointDuration)
>>   }
>>
>> Do you see that log message? what's the interval? that could at least
>> explain why it's not doing anything, if it's quite long.
>>
>> It sort of seems wrong though since
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> suggests it was intended to be a multiple of the batch interval. The
>> slide duration wouldn't always be relevant anyway.
>>
>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>>  wrote:
>> > I've instrumented checkpointing per the programming guide and I can tell
>> > that Spark Streaming is creating the checkpoint directories but I'm not
>> > seeing any content being created in those directories nor am I seeing
>> the
>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>> into
>> > Kafka while the consumers are down, to get picked up when the consumers
>> are
>> > restarted; I'm not seeing that.
>> >
>> > For now my checkpoint directory is set to the local file system with the
>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>> > subdirectory named with a UUID being created under there but no files.
>> >
>> > I'm using a custom JavaStreamingContextFactory which creates a
>> > JavaStreamingContext with the directory set into it via the
>> > checkpoint(String) method.
>> >
>> > I'm currently not invoking the checkpoint(Duration) method on the
>> DStream
>> > since I want to first rely on Spark's default checkpointing interval.
>> My
>> > streaming batch duration millis is set to 1 second.
>> >
>> > Anyone have any idea what might be going wrong?
>> >
>> > Also, at which point does Spark delete files from checkpointing?
>> >
>> > Thanks.
>>
>
>


Re: distributing large matrices

2015-08-14 Thread Rob Sargent

@Koen,

If you meant to reply to my question on distributing matrices, could you 
re-send as there was not content in your post.


Thanks,

On 08/07/2015 10:02 AM, Koen Vantomme wrote:


Verzonden vanaf mijn Sony Xperia™-smartphone



 iceback schreef 

Is this the sort of problem spark can accommodate?

I need to compare 10,000 matrices with each other (10^10 comparison).  The
matrices are 100x10 (10^7 int values).
I have 10 machines with 2 to 8 cores (8-32 "processors").
All machines have to
- contribute to matrices generation (a simulation, takes seconds)
- see all matrices
- compare matrices (takes very little time compared to simulation)

I expect to persist the simulations, have spark push them to processors.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distributing-large-matrices-tp24174.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com .


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 

For additional commands, e-mail: user-h...@spark.apache.org 







Re: Setting up Spark/flume/? to Ingest 10TB from FTP

2015-08-14 Thread Marcelo Vanzin
On Fri, Aug 14, 2015 at 2:11 PM, Varadhan, Jawahar <
varad...@yahoo.com.invalid> wrote:

> And hence, I was planning to use Spark Streaming with Kafka or Flume with
> Kafka. But flume runs on a JVM and may not be the best option as the huge
> file will create memory issues. Please suggest someway to run it inside the
> cluster.
>

I'm not sure why you think memory would be a problem. You don't need to
read all 10GB into memory to transfer the file.

I'm far from the best person to give advice about Flume, but this seems
like it would be a job more in line with what Sqoop does; although a quick
search seems to indicate Sqoop cannot yet read from FTP.

But writing your own code to read from an FTP server when a message arrives
from Kafka shouldn't really be hard.

-- 
Marcelo


Re: Setting up Spark/flume/? to Ingest 10TB from FTP

2015-08-14 Thread Varadhan, Jawahar
Thanks Marcelo. But our problem is little complicated.

We have 10+ ftp sites that we will be transferring data from. The ftp server 
info, filename, credentials are all coming via Kafka message. So, I want to 
read those kafka message and dynamically connect to the ftp site and download 
those fat files and store it in HDFS.
And hence, I was planning to use Spark Streaming with Kafka or Flume with 
Kafka. But flume runs on a JVM and may not be the best option as the huge file 
will create memory issues. Please suggest someway to run it inside the cluster.

 

 From: Marcelo Vanzin 
 To: "Varadhan, Jawahar"  
Cc: "d...@spark.apache.org"  
 Sent: Friday, August 14, 2015 3:23 PM
 Subject: Re: Setting up Spark/flume/? to Ingest 10TB from FTP
   
Why do you need to use Spark or Flume for this?
You can just use curl and hdfs:
  curl ftp://blah | hdfs dfs -put - /blah



On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar  
wrote:

What is the best way to bring such a huge file from a FTP server into Hadoop to 
persist in HDFS? Since a single jvm process might run out of memory, I was 
wondering if I can use Spark or Flume to do this. Any help on this matter is 
appreciated. 
I prefer a application/process running inside Hadoop which is doing this 
transfer
Thanks.



-- 
Marcelo


   

  

Help with persist: Data is requested again

2015-08-14 Thread Saif.A.Ellafi
Hello all,

I am writing a program which calls from a database. A run a couple 
computations, but in the end I have a while loop, in which I make a 
modification to the persisted thata. eg:

val data = PairRDD... persist()
var i = 0
while (i < 10) {
val data_mod = data.map(_._1 + 1, _._2)
val data_joined = data.join(data_mod)
... do stuff with data_joined
}

Sadly, the result causes that the shuffle inside the WHILE loop is causing a 
jdbc call and that is very slow. It is not finding the data locally

How can I help myself?
Saif



Re: Another issue with using lag and lead with data frames

2015-08-14 Thread Jerry
Still not cooperating...

lag(A,1,'X') OVER (ORDER BY A) as LA
  ^
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45)
at
org.apache.spark.sql.DataFrame$$anonfun$selectExpr$1.apply(DataFrame.scala:626)
at
org.apache.spark.sql.DataFrame$$anonfun$selectExpr$1.apply(DataFrame.scala:625)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.DataFrame.selectExpr(DataFrame.scala:625)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:21)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:26)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC.(:34)
at $iwC$$iwC.(:36)
at $iwC.(:38)
at (:40)
at .(:44)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


On Fri, Aug 14, 2015 at 1:39 PM, Jerry  wrote:

> Hi Salih,
> Normally I do sort before performing that operation, but since I've been
> trying to get this working for a week, I'm just loading something simple to
> test if lag works. Earlier I was having DB issues  so it's been a long
> run of solving one runtime exception after another. Hopefully those links
> point me to something useful. Let me know if you can run the above code/
> what you did different to get that code to run.
>
> Thanks,
>   Jerry
>
> On Fri, Aug 14, 2015 at 1:23 PM, Salih Oztop  wrote:
>
>> Hi Jerry,
>> This blog post is perfect for window functions in Spark.
>>
>> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
>> and a generic sql usage from oracle-base blog.
>> https://oracle-base.com/articles/misc/lag-lead-analytic-functions
>>
>> It seems you are not using Window part for Order By clause.
>>
>> Kind Regards
>> Salih Oztop
>>
>> --
>> *From:* Jerry 
>> *To:* user 
>> *Sent:* Friday, August 14, 201

Re: Another issue with using lag and lead with data frames

2015-08-14 Thread Jerry
Hi Salih,
Normally I do sort before performing that operation, but since I've been
trying to get this working for a week, I'm just loading something simple to
test if lag works. Earlier I was having DB issues  so it's been a long
run of solving one runtime exception after another. Hopefully those links
point me to something useful. Let me know if you can run the above code/
what you did different to get that code to run.

Thanks,
  Jerry

On Fri, Aug 14, 2015 at 1:23 PM, Salih Oztop  wrote:

> Hi Jerry,
> This blog post is perfect for window functions in Spark.
>
> https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
> and a generic sql usage from oracle-base blog.
> https://oracle-base.com/articles/misc/lag-lead-analytic-functions
>
> It seems you are not using Window part for Order By clause.
>
> Kind Regards
> Salih Oztop
>
> --
> *From:* Jerry 
> *To:* user 
> *Sent:* Friday, August 14, 2015 5:50 PM
> *Subject:* Another issue with using lag and lead with data frames
>
> So it seems like dataframes aren't going give me a break and just work.
> Now it evaluates but goes nuts if it runs into a null case OR doesn't know
> how to get the correct data type when I specify the default value as a
> string expression. Let me know if anyone has a work around to this. PLEASE
> HELP ME!!!  THIS IS DRIVING ME NUTS! Below is what I used:
>
> JSON:
> {"A":"a"},
> {"A":"c"},
> {"A":"B"},
> {"A":"d"},
> {"A":"A"},
> {"A":null}
> Reading json:
> df = sqlContext.jsonFile("/home/./Desktop/trash.json")
>
>
> CASE 1 (no default):
>
> *$ dfb = df.selectExpr("lag(A,1)")$ dfb.show()*
> Java.lang.NullPointerException
> at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
> at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to
> (Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:662)
> 15/08/14 09:17:29 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID
> 19, localhost): java.lang.NullPointerException
> at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
> at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>

Re: Spark RuntimeException hadoop output format

2015-08-14 Thread Mohit Anchlia
Spark 1.3

Code:

wordCounts.foreachRDD(*new* *Function2, Time,
Void>()* {

@Override

*public* Void call(JavaPairRDD rdd, Time time) *throws*
IOException {

String counts = "Counts at time " + time + " " + rdd.collect();

System.*out*.println(counts);

System.*out*.println("Appending to " + outputFile.getAbsolutePath());

Files.*append*(counts + "\n", outputFile, Charset.*defaultCharset*());

*return* *null*;

}

});

wordCounts.saveAsHadoopFiles(outputPath, "txt", Text.*class*, Text.*class*,
TextOutputFormat.*class*);


What do I need to check in namenode? I see 0 bytes files like this:


drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
/tmp/out-1439495124000.txt
drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
/tmp/out-1439495125000.txt
drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
/tmp/out-1439495126000.txt
drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
/tmp/out-1439495127000.txt
drwxr-xr-x   - ec2-user supergroup  0 2015-08-13 15:45
/tmp/out-1439495128000.txt



However, I also wrote data to a local file on the local file system for
verification and I see the data:


$ ls -ltr !$
ls -ltr /tmp/out
-rw-r--r-- 1 yarn yarn 5230 Aug 13 15:45 /tmp/out


On Fri, Aug 14, 2015 at 6:15 AM, Ted Yu  wrote:

> Which Spark release are you using ?
>
> Can you show us snippet of your code ?
>
> Have you checked namenode log ?
>
> Thanks
>
>
>
> On Aug 13, 2015, at 10:21 PM, Mohit Anchlia 
> wrote:
>
> I was able to get this working by using an alternative method however I
> only see 0 bytes files in hadoop. I've verified that the output does exist
> in the logs however it's missing from hdfs.
>
> On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia 
> wrote:
>
>> I have this call trying to save to hdfs 2.6
>>
>> wordCounts.saveAsNewAPIHadoopFiles("prefix", "txt");
>>
>> but I am getting the following:
>> java.lang.RuntimeException: class scala.runtime.Nothing$ not
>> org.apache.hadoop.mapreduce.OutputFormat
>>
>
>


Re: Another issue with using lag and lead with data frames

2015-08-14 Thread Salih Oztop
Hi Jerry,This blog post is perfect for window functions in 
Spark.https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
and a generic sql usage from oracle-base 
blog.https://oracle-base.com/articles/misc/lag-lead-analytic-functions

It seems you are not using Window part for Order By clause. Kind Regards Salih 
Oztop
  From: Jerry 
 To: user  
 Sent: Friday, August 14, 2015 5:50 PM
 Subject: Another issue with using lag and lead with data frames
   
So it seems like dataframes aren't going give me a break and just work. Now it 
evaluates but goes nuts if it runs into a null case OR doesn't know how to get 
the correct data type when I specify the default value as a string expression. 
Let me know if anyone has a work around to this. PLEASE HELP ME!!!  THIS IS 
DRIVING ME NUTS! Below is what I used:

JSON:
{"A":"a"},
{"A":"c"},
{"A":"B"},
{"A":"d"},
{"A":"A"},
{"A":null}
Reading json:
df = sqlContext.jsonFile("/home/./Desktop/trash.json")


CASE 1 (no default):
$ dfb = df.selectExpr("lag(A,1)")
$ dfb.show()
Java.lang.NullPointerException
    at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
    at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
    at 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
15/08/14 09:17:29 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 19, 
localhost): java.lang.NullPointerException
    at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
    at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
    at 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 

QueueStream Does Not Support Checkpointing

2015-08-14 Thread Asim Jalis
I want to test some Spark Streaming code that is using
reduceByKeyAndWindow. If I do not enable checkpointing, I get the error:

java.lang.IllegalArgumentException: requirement failed: The checkpoint
> directory has not been set. Please set it by StreamingContext.checkpoint().


But if I enable checkpointing I get

queueStream doesn't support checkpointing


Is there a workaround for this?

My goal is to test that the windowing logic in my code is correct. Is there
a way to disable these strict checks or a different dstream I can use that
I can populate programmatically and then use for testing?

Thanks.

Asim


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Dan Dutrow
Thanks. Looking at the KafkaCluster.scala code, (
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L253),
it seems a little hacky for me to alter and recompile spark to expose those
methods, so I'll use the receiver API for the time being and watch for
changes as this API evolves to make those methods a little bit more
accessible. Meanwhile, I'll look into incorporating a database, like maybe
Tachyon, to persist offset and state data across redeployments.

On Fri, Aug 14, 2015 at 3:21 PM Cody Koeninger  wrote:

> I don't entirely agree with that assessment.  Not paying for extra cores
> to run receivers was about as important as delivery semantics, as far as
> motivations for the api.
>
> As I said in the jira tickets on the topic, if you want to use the direct
> api and save offsets to ZK, you can.   The right way to make that easier is
> to expose the (currently private) methods that already exist in
> KafkaCluster.scala for committing offsets through Kafka's api.  I don't
> think adding another "do the wrong thing" option is beneficial.
>
> On Fri, Aug 14, 2015 at 11:34 AM, dutrow  wrote:
>
>> In summary, it appears that the use of the DirectAPI was intended
>> specifically to enable exactly-once semantics. This can be achieved for
>> idempotent transformations and with transactional processing using the
>> database to guarantee an "onto" mapping of results based on inputs. For
>> the
>> latter, you need to store your offsets in the database of record.
>>
>> If you as a developer do not necessarily need exactly-once semantics, then
>> you can probably get by fine using the receiver API.
>>
>> The hope is that one day the Direct API could be augmented with
>> Spark-abstracted offset storage (with zookeeper, kafka, or something else
>> outside of the Spark checkpoint), since this would allow developers to
>> easily take advantage of the Direct API's performance benefits and
>> simplification of parallelism. I think it would be worth adding, even if
>> it
>> were to come with some "buyer beware" caveats.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24273.html
>
>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Our additional question on checkpointing is basically the logistics of it --

At which point does the data get written into checkpointing?  Is it written
as soon as the driver program retrieves an RDD from Kafka (or another
source)?  Or, is it written after that RDD has been processed and we're
basically moving on to the next RDD?

What I'm driving at is, what happens if the driver program is killed?  The
next time it's started, will it know, from Spark Streaming's checkpointing,
to resume from the same RDD that was being processed at the time of the
program getting killed?  In other words, will we, upon restarting the
consumer, resume from the RDD that was unfinished, or will we be looking at
the next RDD?

Will we pick up from the last known *successfully processed* topic offset?

Thanks.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:

> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
>   // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
>   if (mustCheckpoint && checkpointDuration == null) {
> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
> logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
>   }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>  wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing.  I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval.  My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Cody Koeninger
I don't entirely agree with that assessment.  Not paying for extra cores to
run receivers was about as important as delivery semantics, as far as
motivations for the api.

As I said in the jira tickets on the topic, if you want to use the direct
api and save offsets to ZK, you can.   The right way to make that easier is
to expose the (currently private) methods that already exist in
KafkaCluster.scala for committing offsets through Kafka's api.  I don't
think adding another "do the wrong thing" option is beneficial.

On Fri, Aug 14, 2015 at 11:34 AM, dutrow  wrote:

> In summary, it appears that the use of the DirectAPI was intended
> specifically to enable exactly-once semantics. This can be achieved for
> idempotent transformations and with transactional processing using the
> database to guarantee an "onto" mapping of results based on inputs. For the
> latter, you need to store your offsets in the database of record.
>
> If you as a developer do not necessarily need exactly-once semantics, then
> you can probably get by fine using the receiver API.
>
> The hope is that one day the Direct API could be augmented with
> Spark-abstracted offset storage (with zookeeper, kafka, or something else
> outside of the Spark checkpoint), since this would allow developers to
> easily take advantage of the Direct API's performance benefits and
> simplification of parallelism. I think it would be worth adding, even if it
> were to come with some "buyer beware" caveats.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24273.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Spark Job Hangs on our production cluster

2015-08-14 Thread java8964
I still want to check if anyone can provide any help related to the Spark 1.2.2 
will hang on our production cluster when reading Big HDFS data (7800 avro 
blocks), while looks fine for small data (769 avro blocks).
I enable the debug level in the spark log4j, and attached the log file if it 
helps to trouble shooting in this case.
Summary of our cluster:
IBM BigInsight V3.0.0.2 (running with Hadoop 2.2.0 + Hive 0.12)42 Data nodes, 
each one is running HDFS data node process + task tracker + spark workerOne 
master, running HDFS Name node + Spark masterAnother master node, running 2nd 
Name node + JobTracker
The test cases I did are 2, using very simple spark shell to read 2 folders, 
one is big data with 1T avro files; another one is small data with 160G avro 
files.
The avro files schema of 2 folders are different, but I don't think that will 
make any difference here.
The test script is like following:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val 
testdata = sqlContext.avroFile("hdfs://namenode:9000/bigdata_folder")   // vs 
sqlContext.avroFile("hdfs://namenode:9000/smalldata_folder")testdata.registerTempTable("testdata")testdata.count()
Both cases are kicking off as the same following:/opt/spark/bin/spark-shell 
--jars /opt/ibm/cclib/spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 24G --total-executor-cores 42 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=2000 
--conf spark.default.parallelism=2000
When the script point to the small data folder, the Spark can finish very fast. 
Each task of scanning the HDFS block can finish within 30 seconds or less.
When the script point to the big data folder, most of the nodes can finish scan 
the first block of HDFS within 2 mins (longer than case 1), then the scanning 
will hang, across all the nodes in the cluster, which means no task can 
continue any more. The whole job will hang until I have to killed it.
There are logs attached in this email, and here is what I can read from the log 
files:
1) Spark-finished.log, which is the log generated from Spark in good case.
In this case, it is clear there is a loop to read the data from the HDFS, 
looping like:15/08/14 14:38:05 INFO HadoopRDD: Input split:15/08/14 
14:37:40 DEBUG Client: IPC Client (370155726) connection to 
p2-bigin101/10.20.95.130:9000 from15/08/14 14:37:40 DEBUG 
ProtobufRpcEngine: Call: getBlockLocations took 2ms15/08/14 14:38:32 INFO 
HadoopRDD: Input split:
 There are exception in it, like: java.lang.NoSuchMethodException: 
org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()at 
java.lang.Class.getDeclaredMethod(Class.java:2009)at 
org.apache.spark.util.Utils$.invoke(Utils.scala:1827)at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:179)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anonfun$getFileSystemThreadStatistics$1.apply(SparkHadoopUtil.scala:179)
But doesn't affect the function and didn't fail the job.
2) Spark-hang.log, which is from the same node generated from Spark in the hang 
case:In this case, it looks like Spark can read the data from HDFS first 
time, as the log looked same as the good case log., but after that, only the 
following DEBUG message output: 15/08/14 14:24:19 DEBUG Worker: [actor] 
received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:19 DEBUG Worker: 
[actor] handled message (0.121965 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:34 DEBUG Worker: 
[actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:34 DEBUG Worker: 
[actor] handled message (0.135455 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:49 DEBUG Worker: 
[actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]
There is no more "connecting" to datanode message, until after 10 minus, I have 
to just kill the executor.
While in this 10 minutes, I did 2 times of "jstack" of the Spark java 
processor, trying to find out what thread is being blocked, attached as 
"2698306-1.log" and "2698306-2.log", as 2698306 is the pid.
Can some one give me any hint about what could be the root reason of this? 
While the spark is hanging to read the big dataset, the HDFS is health, as I 
can get/put the data in HDFS, and also the MR job running at same time continue 
without any problems.
I am thinking to generate a 1T text files folder to test Spark in this cluster, 
as I want to rule out any problem could related to AVRO, but it will take a 
while for me to generate that. But I am not sure if AVRO format could be the 
cause.
Thanks for your help.
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Spark Job Hangs on our production 

Fwd: Graphx - how to add vertices to a HashSet of vertices ?

2015-08-14 Thread Ranjana Rajendran
-- Forwarded message --
From: Ranjana Rajendran 
Date: Thu, Aug 13, 2015 at 7:37 AM
Subject: Graphx - how to add vertices to a HashSet of vertices ?
To: d...@spark.apache.org


Hi,

sampledVertices is a HashSet of vertices

  var sampledVertices: HashSet[VertexId] = HashSet()

In each iteration, I am making a list of neighborVertexIds

  val neighborVertexIds = burnEdges.map((e:Edge[Int]) => e.dstId)

I want to add this neighborVertexIds to the sampledVertices Hashset.

What is the best way to do this ?

   Currently, I have

sampledVertices = sampledVertices ++ neighborVertexIds.toArray

I realize, toArray is making this a separate stage.

What will be the most efficient way to achieve this ?

Similarly, I need to add the neighborVertexIds to burnQueue where burnQueue
is is a queue of vertices

var burnQueue: Queue[VertexId] = Queue()

Thank you,

Ranjana


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread dutrow
In summary, it appears that the use of the DirectAPI was intended
specifically to enable exactly-once semantics. This can be achieved for
idempotent transformations and with transactional processing using the
database to guarantee an "onto" mapping of results based on inputs. For the
latter, you need to store your offsets in the database of record.

If you as a developer do not necessarily need exactly-once semantics, then
you can probably get by fine using the receiver API. 

The hope is that one day the Direct API could be augmented with
Spark-abstracted offset storage (with zookeeper, kafka, or something else
outside of the Spark checkpoint), since this would allow developers to
easily take advantage of the Direct API's performance benefits and
simplification of parallelism. I think it would be worth adding, even if it
were to come with some "buyer beware" caveats.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24273.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Another issue with using lag and lead with data frames

2015-08-14 Thread Jerry
So it seems like dataframes aren't going give me a break and just work. Now
it evaluates but goes nuts if it runs into a null case OR doesn't know how
to get the correct data type when I specify the default value as a string
expression. Let me know if anyone has a work around to this. PLEASE HELP
ME!!!  THIS IS DRIVING ME NUTS! Below is what I used:

JSON:
{"A":"a"},
{"A":"c"},
{"A":"B"},
{"A":"d"},
{"A":"A"},
{"A":null}
Reading json:
df = sqlContext.jsonFile("/home/./Desktop/trash.json")


CASE 1 (no default):

*$ dfb = df.selectExpr("lag(A,1)")$ dfb.show()*
Java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
15/08/14 09:17:29 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 19,
localhost): java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.evaluate(GenericUDFLeadLag.java:57)
at org.apache.spark.sql.hive.HiveGenericUdf.eval(hiveUdfs.scala:188)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Ex

Re: How to specify column type when saving DataFrame as parquet file?

2015-08-14 Thread Francis Lau
Jyun Fan

Here is how I have been doing it. I found that I needed to define the
schema when loading the JSON file first

Francis

import datetime
from pyspark.sql.types import *

# Define schema
upSchema = StructType([
  StructField("field 1", StringType(), True),
  StructField("field 2", LongType(), True),
  StructField("field 3", TimestampType(), True),
  StructField("field 4", DoubleType(), True)
  ])

# Load JSON file with schema
filePath = "YourData.json"
DF = sqlContext.read.schema(upSchema).json(filePath)

# Save to Parquet
savePath = "ConvertedData.parquet"

# adjust repartition number below based on size of data, I try to keep
parquet files to
# be under 500 MB and avoid many small files as well i.e. hundreds of 10 MB
files
DF.repartition(1).write.parquet(savePath)

On Fri, Aug 14, 2015 at 7:29 AM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> I think you can try dataFrame create api that takes RDD[Row] and Struct
> type...
> On Aug 11, 2015 4:28 PM, "Jyun-Fan Tsai"  wrote:
>
>> Hi all,
>> I'm using Spark 1.4.1.  I create a DataFrame from json file.  There is
>> a column C that all values are null in the json file.  I found that
>> the datatype of column C in the created DataFrame is string.  However,
>> I would like to specify the column as Long when saving it as parquet
>> file.  What should I do to specify the column type when saving parquet
>> file?
>>
>> Thank you,
>> Jyun-Fan Tsai
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 
*Francis Lau* | *Smartsheet*
Senior Director of Product Intelligence
*c* 425-830-3889 (call/text)
francis@smartsheet.com 


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread dutrow
For those who find this post and may be interested, the most thorough
documentation on the subject may be found here:
https://github.com/koeninger/kafka-exactly-once



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24270.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Cody Koeninger
Use your email client to send a message to the mailing list from the email
address you used to subscribe?

The message you just sent reached the list

On Fri, Aug 14, 2015 at 9:36 AM, dutrow  wrote:

> How do I get beyond the "This post has NOT been accepted by the mailing
> list
> yet" message? This message was posted through the nabble interface; one
> would think that would be enough to get the message accepted.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24268.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread dutrow
How do I get beyond the "This post has NOT been accepted by the mailing list
yet" message? This message was posted through the nabble interface; one
would think that would be enough to get the message accepted.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cannot cast to Tuple when running in cluster mode

2015-08-14 Thread Saif.A.Ellafi
Hi All,

I have a working program, in which I create two big tuples2 out of the data. 
This seems to work in local but when I switch over cluster standalone mode, I 
get this error at the very beggining:

15/08/14 10:22:25 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 10, 
162.101.194.44): java.lang.ClassCastException: 
scala.collection.Iterator$$anon$13 cannot be cast to scala.Tuple2
at 
org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The data comes from JDBC, but I also tried persisting it into memory to turn it 
into a collection, in case JDBC was the problem.

Any advice?
Saif



Re: Spark Streaming: Change Kafka topics on runtime

2015-08-14 Thread Cody Koeninger
There's a long recent thread in this list about stopping apps, subject was
"stopping spark stream app"

at 1 second I wouldn't run repeated rdds, no.

I'd take a look at subclassing, personally (you'll have to rebuild the
streaming kafka project since a lot is private), but if topic changes dont
happen that often, restarting the app when they do should be fine.

On Fri, Aug 14, 2015 at 6:34 AM, Nisrina Luthfiyati <
nisrina.luthfiy...@gmail.com> wrote:

> Hi Cody,
>
> by start/stopping, do you mean the streaming context or the app entirely?
> From what I understand once a streaming context has been stopped it cannot
> be restarted, but I also haven't found a way to stop the app
> programmatically.
>
> The batch duration will probably be around 1-10 seconds. I think this is
> small enough to not make it a batch job?
>
> Thanks again
>
> On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger 
> wrote:
>
>> The current kafka stream implementation assumes the set of topics doesn't
>> change during operation.
>>
>> You could either take a crack at writing a subclass that does what you
>> need; stop/start; or if your batch duration isn't too small, you could run
>> it as a series of RDDs (using the existing KafkaUtils.createRDD) where the
>> set of topics is determined before each rdd.
>>
>> On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati <
>> nisrina.luthfiy...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I want to write a Spark Streaming program that listens to Kafka for a
>>> list of topics.
>>> The list of topics that I want to consume is stored in a DB and might
>>> change dynamically. I plan to periodically refresh this list of topics in
>>> the Spark Streaming app.
>>>
>>> My question is is it possible to add/remove a Kafka topic that is
>>> consumed by a stream, or probably create a new stream at runtime?
>>> Would I need to stop/start the program or is there any other way to do
>>> this?
>>>
>>> Thanks!
>>> Nisrina
>>>
>>
>>
>
>
> --
> Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010
> http://www.facebook.com/nisrina.luthfiyati
> http://id.linkedin.com/in/nisrina
>
>


Re: Spark job endup with NPE

2015-08-14 Thread Akhil Das
You can put a try..catch around all the transformations that you are doing
and catch such exceptions instead of crashing your entire job.

Thanks
Best Regards

On Fri, Aug 14, 2015 at 4:35 PM, hide  wrote:

> Hello,
>
> I'm using spark on yarn cluster and using mongo-hadoop-connector to pull
> data to spark, doing some job
> The job has following stage.
> (flatMap -> flatMap -> reduceByKey -> sortByKey)
>
> The data in MongoDB is tweet from twitter.
>
> First, connect to mongodb and make RDD by following
>
> val mongoRDD = sc.newAPIHadoopRDD(mongoConfig,
> classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object],
> classOf[BSONObject])
>
> Set "mongo.input.fields" as below
> mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}")
>
> the data inside of mongoRDD is looks like
>
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast
> cluster …” })
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “hello, my  …” })
> (558baf...,
> { "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” })
>
>
> Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and
> dividing them to word.
>
> val wordRDD = mongoRDD.flatMap(arg => {
>
>   var str = arg._2.get("text").toString
>
>// using tokenizer to divid tweet to word or just split tweet by
> white space
> }
>
> After this, wordRDD is looks like
>
> ("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my",
> ..)
>
>
> When I trying to print every element in wordRDD, I get following error. I
> know that the tweet involve with newline charactor or space or tab, but
> what
> makes this NPE?
>
> Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and
> the
> next value is null ?
>
>
> 15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in
> stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes)
> 15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage
> 14.0 (TID 11136, iot-spark02): java.lang.NullPointerException
> at
>
> $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:85)
> at
>
> $line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:73)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Can I avoid this error by wrapping the word by scala.Option ?
> If anybody know why, please help me?
>
> Thanks,
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to specify column type when saving DataFrame as parquet file?

2015-08-14 Thread Raghavendra Pandey
I think you can try dataFrame create api that takes RDD[Row] and Struct
type...
On Aug 11, 2015 4:28 PM, "Jyun-Fan Tsai"  wrote:

> Hi all,
> I'm using Spark 1.4.1.  I create a DataFrame from json file.  There is
> a column C that all values are null in the json file.  I found that
> the datatype of column C in the created DataFrame is string.  However,
> I would like to specify the column as Long when saving it as parquet
> file.  What should I do to specify the column type when saving parquet
> file?
>
> Thank you,
> Jyun-Fan Tsai
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Left outer joining big data set with small lookups

2015-08-14 Thread Silvio Fiorito
You could cache the lookup DataFrames, it’ll then do a broadcast join.




On 8/14/15, 9:39 AM, "VIJAYAKUMAR JAWAHARLAL"  wrote:

>Hi
>
>I am facing huge performance problem when I am trying to left outer join very 
>big data set (~140GB) with bunch of small lookups [Start schema type]. I am 
>using data frame  in spark sql. It looks like data is shuffled and skewed when 
>that join happens. Is there any way to improve performance of such type of 
>join in spark? 
>
>How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
>Would it help to create broadcast variables on small lookups?  If I create 
>broadcast variables, how can I convert them into data frame and use them in 
>sparksql type of join?
>
>Thanks
>Vijay
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Left outer joining big data set with small lookups

2015-08-14 Thread VIJAYAKUMAR JAWAHARLAL
Hi

I am facing huge performance problem when I am trying to left outer join very 
big data set (~140GB) with bunch of small lookups [Start schema type]. I am 
using data frame  in spark sql. It looks like data is shuffled and skewed when 
that join happens. Is there any way to improve performance of such type of join 
in spark? 

How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
Would it help to create broadcast variables on small lookups?  If I create 
broadcast variables, how can I convert them into data frame and use them in 
sparksql type of join?

Thanks
Vijay
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

2015-08-14 Thread Stephen Boesch
The NoClassDefFoundException differs from ClassNotFoundException : it
indicates an error while initializing that class: but the class is found in
the classpath. Please provide the full stack trace.

2015-08-14 4:59 GMT-07:00 stelsavva :

> Hello, I am just starting out with spark streaming and Hbase/hadoop, i m
> writing a simple app to read from kafka and store to Hbase, I am having
> trouble submitting my job to spark.
>
> I 've downloaded Apache Spark 1.4.1 pre-build for hadoop 2.6
>
> I am building the project with mvn package
>
> and submitting the jar file with
>
>  ~/Desktop/spark/bin/spark-submit --class org.example.main.scalaConsumer
> scalConsumer-0.0.1-SNAPSHOT.jar
>
> And then i am getting the error you see in the subject line. Is this a
> problem with my maven dependencies? do i need to install hadoop locally?
> And
> if so how can i add the hadoop classpath to the spark job?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-Exception-in-thread-main-java-lang-NoClassDefFoundError-org-apache-hadoop-hbase-HBaseConfiguran-tp24266.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark RuntimeException hadoop output format

2015-08-14 Thread Ted Yu
Which Spark release are you using ?

Can you show us snippet of your code ?

Have you checked namenode log ?

Thanks



> On Aug 13, 2015, at 10:21 PM, Mohit Anchlia  wrote:
> 
> I was able to get this working by using an alternative method however I only 
> see 0 bytes files in hadoop. I've verified that the output does exist in the 
> logs however it's missing from hdfs.
> 
>> On Thu, Aug 13, 2015 at 10:49 AM, Mohit Anchlia  
>> wrote:
>> I have this call trying to save to hdfs 2.6
>> wordCounts.saveAsNewAPIHadoopFiles("prefix", "txt");
>> 
>> but I am getting the following:
>> 
>> java.lang.RuntimeException: class scala.runtime.Nothing$ not 
>> org.apache.hadoop.mapreduce.OutputFormat
> 


Re: Spark Streaming: Change Kafka topics on runtime

2015-08-14 Thread Nisrina Luthfiyati
Hi Cody,

by start/stopping, do you mean the streaming context or the app entirely?
>From what I understand once a streaming context has been stopped it cannot
be restarted, but I also haven't found a way to stop the app
programmatically.

The batch duration will probably be around 1-10 seconds. I think this is
small enough to not make it a batch job?

Thanks again

On Thu, Aug 13, 2015 at 10:15 PM, Cody Koeninger  wrote:

> The current kafka stream implementation assumes the set of topics doesn't
> change during operation.
>
> You could either take a crack at writing a subclass that does what you
> need; stop/start; or if your batch duration isn't too small, you could run
> it as a series of RDDs (using the existing KafkaUtils.createRDD) where the
> set of topics is determined before each rdd.
>
> On Thu, Aug 13, 2015 at 4:38 AM, Nisrina Luthfiyati <
> nisrina.luthfiy...@gmail.com> wrote:
>
>> Hi all,
>>
>> I want to write a Spark Streaming program that listens to Kafka for a
>> list of topics.
>> The list of topics that I want to consume is stored in a DB and might
>> change dynamically. I plan to periodically refresh this list of topics in
>> the Spark Streaming app.
>>
>> My question is is it possible to add/remove a Kafka topic that is
>> consumed by a stream, or probably create a new stream at runtime?
>> Would I need to stop/start the program or is there any other way to do
>> this?
>>
>> Thanks!
>> Nisrina
>>
>
>


-- 
Nisrina Luthfiyati - Ilmu Komputer Fasilkom UI 2010
http://www.facebook.com/nisrina.luthfiyati
http://id.linkedin.com/in/nisrina


Error: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration

2015-08-14 Thread stelsavva
Hello, I am just starting out with spark streaming and Hbase/hadoop, i m
writing a simple app to read from kafka and store to Hbase, I am having
trouble submitting my job to spark.

I 've downloaded Apache Spark 1.4.1 pre-build for hadoop 2.6

I am building the project with mvn package

and submitting the jar file with 

 ~/Desktop/spark/bin/spark-submit --class org.example.main.scalaConsumer
scalConsumer-0.0.1-SNAPSHOT.jar 

And then i am getting the error you see in the subject line. Is this a
problem with my maven dependencies? do i need to install hadoop locally? And
if so how can i add the hadoop classpath to the spark job?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-Exception-in-thread-main-java-lang-NoClassDefFoundError-org-apache-hadoop-hbase-HBaseConfiguran-tp24266.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark job endup with NPE

2015-08-14 Thread hide
Hello, 

I'm using spark on yarn cluster and using mongo-hadoop-connector to pull
data to spark, doing some job
The job has following stage.
(flatMap -> flatMap -> reduceByKey -> sortByKey)

The data in MongoDB is tweet from twitter.

First, connect to mongodb and make RDD by following

val mongoRDD = sc.newAPIHadoopRDD(mongoConfig,
classOf[com.mongodb.hadoop.MongoInputFormat],classOf[Object],
classOf[BSONObject])

Set "mongo.input.fields" as below
mongoConfig.set("mongo.input.fields", "{\"_id\": 1, \"text\" : 1}")

the data inside of mongoRDD is looks like

(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Apache spark is Lightning-fast
cluster …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “hello, my  …” })
(558baf...,
{ "_id" : { "$oid" : "558baf…"} , "text" : “Hi, aaa …” })


Nex stage, I use flatMap, inside flatMap getting "text" element(tweet) and
dividing them to word. 

val wordRDD = mongoRDD.flatMap(arg => {
  
  var str = arg._2.get("text").toString

   // using tokenizer to divid tweet to word or just split tweet by
white space
}

After this, wordRDD is looks like

("Apache", "spark", "is", "Lightning-fast", "cluster", "hello", "my",
..)


When I trying to print every element in wordRDD, I get following error. I
know that the tweet involve with newline charactor or space or tab, but what
makes this NPE? 

Is this error "Iterator$$anon$13.hasNext" means iterating though RDD and the
next value is null ?


15/08/13 22:15:14 INFO scheduler.TaskSetManager: Starting task 2766.3 in
stage 14.0 (TID 11136, iot-spark02, RACK_LOCAL, 1951 bytes) 
 
15/08/13 22:18:53 WARN scheduler.TaskSetManager: Lost task 2766.3 in stage
14.0 (TID 11136, iot-spark02): java.lang.NullPointerException   
   
at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:85)
  
at
$line129.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:73)
  
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)   
 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)   
 
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
   
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
 
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)  
  
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)  
  
at org.apache.spark.scheduler.Task.run(Task.scala:70)   
 
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)   
  
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
  
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
  
at java.lang.Thread.run(Thread.java:745) 


Can I avoid this error by wrapping the word by scala.Option ?
If anybody know why, please help me? 

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-endup-with-NPE-tp24264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveToCassandra not working in Spark Job but works in Spark Shell

2015-08-14 Thread satish chandra j
Hi Akhil,
Which jar version is conflicting and what needs to be done for the fix

Regards,
Satish Chandra

On Fri, Aug 14, 2015 at 2:44 PM, Akhil Das 
wrote:

> Looks like a jar version conflict to me.
>
> Thanks
> Best Regards
>
> On Thu, Aug 13, 2015 at 7:59 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI,
>> Please let me know if I am missing anything in the below mail, to get the
>> issue fixed
>>
>> Regards,
>> Satish Chandra
>>
>> On Wed, Aug 12, 2015 at 6:59 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI,
>>>
>>> The below mentioned code is working very well fine in Spark Shell but
>>> when the same is placed in Spark Application it is errors as mentioned
>>> below:
>>>
>>> *Exception in thread "main" java.lang.NoSuchMethodError:
>>> com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions*
>>>
>>>
>>> *Code:*
>>>
>>> *import* *org.apache*.spark.SparkContext
>>>
>>> *import* *org.apache*.spark.SparkContext._
>>>
>>> *import* *org.apache*.spark.SparkConf
>>>
>>> *import* *org.apache*.spark.rdd.JdbcRDD
>>>
>>> *import* *com.datastax*.spark.connector._
>>>
>>> *import* com.datastax.spark.connector.cql.CassandraConnector
>>>
>>> *import* com.datastax.bdp.spark.DseSparkConfHelper._
>>>
>>> *import* java.sql.{Connection, DriverManager, ResultSet,
>>> PreparedStatement, SQLException, Statement}
>>>
>>> *object* HelloWorld {
>>>
>>> *def* main(args: Array[String]) {
>>>
>>>   *def* createSparkContext() = {
>>>
>>> *val* conf = *new* SparkConf().set(
>>> "spark.cassandra.connection.host", "10.246.43.15")
>>>
>>>.setAppName("First Spark App")
>>>
>>>.setMaster("local")
>>>
>>>.set("cassandra.username", "username")
>>>
>>>.set("cassandra.password", "password")
>>>
>>>.forDse
>>>
>>>*new* SparkContext(conf)
>>>
>>> }
>>>
>>>
>>>
>>>   *val* sc = createSparkContext()
>>>
>>>   *val* user="user"
>>>
>>>   *val** pass=*"password"
>>>
>>>   Class.forName("org.postgresql.Driver").newInstance
>>>
>>>   *val* url = "jdbc:postgresql://gptester:5432/db_test"
>>>
>>>   *val* myRDD27 = *new* JdbcRDD( sc, ()=>
>>> DriverManager.getConnection(url,user,pass),"select * from
>>> wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?",5,0,1,(r: ResultSet) =>
>>> (r.getInt("alarm_type_code"),r.getString("language_code"),r.getString(
>>> "alrm_type_cd_desc")))
>>>
>>>  * myRDD27.saveToCassandra(*
>>> *"keyspace","arm_typ_txt",SomeColumns("alarm_type_code","language_code","alrm_type_cd_desc"))*
>>>
>>>   println(myRDD27.count())
>>>
>>>   println(myRDD27.first)
>>>
>>>   sc.stop()
>>>
>>>   sys.exit()
>>>
>>>
>>>
>>> }
>>>
>>>   }
>>>
>>>
>>>
>>> *Command: *
>>> dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
>>> --jars /home/missingmerch/postgresql-9.4-1201.jdbc41.jar etl-0.0.
>>> 1-SNAPSHOT.jar
>>>
>>> Please let me know if any solutions for this issue
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>
>>
>


Re: spark.files.userClassPathFirst=true Return Error - Please help

2015-08-14 Thread Kyle Lin
Hello Akhil

I use Spark 1.4.2 on HDP 2.1(Hadoop 2.4)

I didn't use --driver-class-path. I only use
spark.executor.userClassPathFirst=true


Kyle


2015-08-14 17:11 GMT+08:00 Akhil Das :

> Which version of spark are you using? Did you try with --driver-class-path
> configuration?
>
> Thanks
> Best Regards
>
> On Fri, Aug 14, 2015 at 2:05 PM, Kyle Lin  wrote:
>
>> Hi all
>>
>> I had similar usage and also got the same problem.
>>
>> I guess Spark use some class in my user jars but actually it should use
>> the class in spark-assembly-xxx.jar, but I don't know how to fix it.
>>
>> Kyle
>>
>>
>>
>> 2015-07-22 23:03 GMT+08:00 Ashish Soni :
>>
>>> Hi All ,
>>>
>>> I am getting below error when i use the --conf
>>> spark.files.userClassPathFirst=true parameter
>>>
>>> Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times,
>>> most recent failure: Lost task 3.3 in stage 0.0 (TID 32, 10.200.37.161):
>>> java.lang.ClassCastException: cannot assign instance of scala.None$ to
>>> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
>>> instance of org.apache.spark.scheduler.ResultTask
>>>
>>> I am using as below
>>>
>>> spark-submit --conf spark.files.userClassPathFirst=true --driver-memory
>>> 6g --executor-memory 12g --executor-cores 4   --class
>>> com.ericsson.engine.RateDriver --master local
>>> /home/spark/workspace/simplerating/target/simplerating-0.0.1-SNAPSHOT.jar
>>> spark://eSPARKMASTER:7077 hdfs://enamenode/user/spark
>>>
>>> thanks
>>>
>>
>>
>


Re: spark streaming map use external variable occur a problem

2015-08-14 Thread Shixiong Zhu
Looks you compiled the codes with one Scala version but ran your app using
a different incompatible version.

BTW, you should not use PrintWriter like this to save your results. There
may be multiple tasks running at the same host, and your job will fail
because you are trying to write to the same file. Could you convert your
data to String using "map" and use "saveAsTextFile" or other "save" methods?

Best Regards,
Shixiong Zhu

2015-08-14 11:02 GMT+08:00 kale <805654...@qq.com>:

>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Sorted Multiple Outputs

2015-08-14 Thread Yiannis Gkoufas
Hi Eugene,

in my case the list of values that I want to sort and write to a separate
file, its fairly small so the way I solved it is the following:

.groupByKey().foreach(e => {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig);
  val newPath = rootPath+"/"+e._1;
  val dstream = hdfs.create(new Path(newPath));
  val bstream = new BufferedOutputStream(dstream, 100 * 1024)
  val writer = new PrintWriter(bstream)
  e._2.toList.sortBy(_._1).foreach(sub => {
writer.println(Utils.getDateStr(sub._1)+","+sub._2+","+sub._3);
  })
  writer.flush()
  writer.close();
})


Not sure what I changed to the way I write to HDFS, but this approach worked.


Thanks a lot!


On 13 August 2015 at 01:06, Eugene Morozov  wrote:

> Yiannis,
>
> sorry for late response,
> It is indeed not possible to create new RDD inside of foreachPartitions,
> so you have to write data manually. I haven’t tried that and haven’t got
> such an exception, but I’d assume you might try to write locally and them
> upload it into HDFS. FileSystem has a specific method for that
> “copyFromLocalFile”.
>
> Another approach would be to try to split RDD into multiple RDDs by key.
> You can get distinct keys, collect them on driver and have a loop over they
> keys and filter out new RDD out of the original one by that key.
>
> for( key : keys ) {
> RDD.filter( key ).saveAsTextfile()
> }
>
> It might help to cache original rdd.
>
> On 16 Jul 2015, at 12:21, Yiannis Gkoufas  wrote:
>
> Hi Eugene,
>
> thanks for your response!
> Your recommendation makes sense, that's what I more or less tried.
> The problem that I am facing is that inside foreachPartition() I cannot
> create a new rdd and use saveAsTextFile.
> It would probably make sense to write directly to HDFS using the Java API.
> When I tried that I was getting errors similar to this:
>
> Failed on local exception: java.io.InterruptedIOException: Interruped
> while waiting for IO on channel java.nio.channels.SocketChannel
>
> Probably it's hitting a race condition.
>
> Has anyone else faced this situation? Any suggestions?
>
> Thanks a lot!
>
> On 15 July 2015 at 14:04, Eugene Morozov  wrote:
>
>> Yiannis ,
>>
>> It looks like you might explore other approach.
>>
>> sc.textFile("input/path")
>> .map() // your own implementation
>> .partitionBy(new HashPartitioner(num))
>> .groupBy() //your own implementation, as a result - PairRDD of key vs
>> Iterable of values
>> .foreachPartition()
>>
>> On the last step you could sort all values for the key and store them
>> into separate file even into the same directory of all other files for
>> other keys.
>> HashParititoner must guarantee that all values for specific key will
>> reside in just one partition, but it might happen that one partition might
>> contain more, than one key (with values). This I’m not sure, but that
>> shouldn’t be a big deal as you would iterate over tuple> Iterable> and store one key to a specific file.
>>
>> On 15 Jul 2015, at 03:23, Yiannis Gkoufas  wrote:
>>
>> Hi there,
>>
>> I have been using the approach described here:
>>
>>
>> http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job
>>
>> In addition to that, I was wondering if there is a way to set the
>> customize the order of those values contained in each file.
>>
>> Thanks a lot!
>>
>>
>> Eugene Morozov
>> fathers...@list.ru
>>
>>
>>
>>
>>
>
> Eugene Morozov
> fathers...@list.ru
>
>
>
>
>


Re: Always two tasks slower than others, and then job fails

2015-08-14 Thread Zoltán Zvara
Data skew is still a problem with Spark.

- If you use groupByKey, try to express your logic by not using groupByKey.
- If you need to use groupByKey, all you can do is to scale vertically.
- If you can, repartition with a finer HashPartitioner. You will have many
tasks for each stage, but tasks are light-weight in Spark, so it should not
introduce a heavy overhead. If you have your own domain-partitioner, try to
rewrite it by introducing a secondary-key.

I hope I gave some insights and help.

On Fri, Aug 14, 2015 at 9:37 AM Jeff Zhang  wrote:

> Data skew ? May your partition key has some special value like null or
> empty string
>
> On Fri, Aug 14, 2015 at 11:01 AM, randylu  wrote:
>
>>   It is strange that there are always two tasks slower than others, and
>> the
>> corresponding partitions's data are larger, no matter how many partitions?
>>
>>
>> Executor ID Address Task Time   Shuffle Read Size
>> /
>> Records
>> 1   slave129.vsvs.com:56691 16 s1   99.5 MB / 18865432
>> *10 slave317.vsvs.com:59281 0 ms0   413.5 MB / 311001318*
>> 100 slave290.vsvs.com:60241 19 s1   110.8 MB / 27075926
>> 101 slave323.vsvs.com:36246 14 s1   126.1 MB / 25052808
>>
>>   Task time and records of Executor 10 seems strange, and the cpus on the
>> node are all 100% busy.
>>
>>   Anyone meets the same problem,  Thanks in advance for any answer!
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: RDD.join vs spark SQL join

2015-08-14 Thread Akhil Das
Both works the same way, but with SparkSQL you will get the optimization
etc done by the catalyst. One important thing to consider is the #
partitions and the key distribution (when you are doing RDD.join), If the
keys are not evenly distributed across machines then you can see the
process chocking on a single task (more like it takes hell lot of time for
one task to execute compared to others in that stage).

Thanks
Best Regards

On Fri, Aug 14, 2015 at 1:25 AM, Xiao JIANG  wrote:

> Hi,
>
> May I know the performance difference the rdd.join function and spark SQL
> join operation. If I want to join several big Rdds, how should I decide
> which one I should use? What are the factors to consider here?
>
> Thanks!
>


Re: saveToCassandra not working in Spark Job but works in Spark Shell

2015-08-14 Thread Akhil Das
Looks like a jar version conflict to me.

Thanks
Best Regards

On Thu, Aug 13, 2015 at 7:59 PM, satish chandra j 
wrote:

> HI,
> Please let me know if I am missing anything in the below mail, to get the
> issue fixed
>
> Regards,
> Satish Chandra
>
> On Wed, Aug 12, 2015 at 6:59 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI,
>>
>> The below mentioned code is working very well fine in Spark Shell but
>> when the same is placed in Spark Application it is errors as mentioned
>> below:
>>
>> *Exception in thread "main" java.lang.NoSuchMethodError:
>> com.datastax.spark.connector.package$.toRDDFunctions(Lorg/apache/spark/rdd/RDD;Lscala/reflect/ClassTag;)Lcom/datastax/spark/connector/RDDFunctions*
>>
>>
>> *Code:*
>>
>> *import* *org.apache*.spark.SparkContext
>>
>> *import* *org.apache*.spark.SparkContext._
>>
>> *import* *org.apache*.spark.SparkConf
>>
>> *import* *org.apache*.spark.rdd.JdbcRDD
>>
>> *import* *com.datastax*.spark.connector._
>>
>> *import* com.datastax.spark.connector.cql.CassandraConnector
>>
>> *import* com.datastax.bdp.spark.DseSparkConfHelper._
>>
>> *import* java.sql.{Connection, DriverManager, ResultSet,
>> PreparedStatement, SQLException, Statement}
>>
>> *object* HelloWorld {
>>
>> *def* main(args: Array[String]) {
>>
>>   *def* createSparkContext() = {
>>
>> *val* conf = *new* SparkConf().set(
>> "spark.cassandra.connection.host", "10.246.43.15")
>>
>>.setAppName("First Spark App")
>>
>>.setMaster("local")
>>
>>.set("cassandra.username", "username")
>>
>>.set("cassandra.password", "password")
>>
>>.forDse
>>
>>*new* SparkContext(conf)
>>
>> }
>>
>>
>>
>>   *val* sc = createSparkContext()
>>
>>   *val* user="user"
>>
>>   *val** pass=*"password"
>>
>>   Class.forName("org.postgresql.Driver").newInstance
>>
>>   *val* url = "jdbc:postgresql://gptester:5432/db_test"
>>
>>   *val* myRDD27 = *new* JdbcRDD( sc, ()=>
>> DriverManager.getConnection(url,user,pass),"select * from
>> wmax_vmax.arm_typ_txt LIMIT ? OFFSET ?",5,0,1,(r: ResultSet) =>
>> (r.getInt("alarm_type_code"),r.getString("language_code"),r.getString(
>> "alrm_type_cd_desc")))
>>
>>  * myRDD27.saveToCassandra(*
>> *"keyspace","arm_typ_txt",SomeColumns("alarm_type_code","language_code","alrm_type_cd_desc"))*
>>
>>   println(myRDD27.count())
>>
>>   println(myRDD27.first)
>>
>>   sc.stop()
>>
>>   sys.exit()
>>
>>
>>
>> }
>>
>>   }
>>
>>
>>
>> *Command: *
>> dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld
>> --jars /home/missingmerch/postgresql-9.4-1201.jdbc41.jar etl-0.0.
>> 1-SNAPSHOT.jar
>>
>> Please let me know if any solutions for this issue
>>
>> Regards,
>> Satish Chandra
>>
>
>


Re: spark.files.userClassPathFirst=true Return Error - Please help

2015-08-14 Thread Akhil Das
Which version of spark are you using? Did you try with --driver-class-path
configuration?

Thanks
Best Regards

On Fri, Aug 14, 2015 at 2:05 PM, Kyle Lin  wrote:

> Hi all
>
> I had similar usage and also got the same problem.
>
> I guess Spark use some class in my user jars but actually it should use
> the class in spark-assembly-xxx.jar, but I don't know how to fix it.
>
> Kyle
>
>
>
> 2015-07-22 23:03 GMT+08:00 Ashish Soni :
>
>> Hi All ,
>>
>> I am getting below error when i use the --conf
>> spark.files.userClassPathFirst=true parameter
>>
>> Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times,
>> most recent failure: Lost task 3.3 in stage 0.0 (TID 32, 10.200.37.161):
>> java.lang.ClassCastException: cannot assign instance of scala.None$ to
>> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
>> instance of org.apache.spark.scheduler.ResultTask
>>
>> I am using as below
>>
>> spark-submit --conf spark.files.userClassPathFirst=true --driver-memory
>> 6g --executor-memory 12g --executor-cores 4   --class
>> com.ericsson.engine.RateDriver --master local
>> /home/spark/workspace/simplerating/target/simplerating-0.0.1-SNAPSHOT.jar
>> spark://eSPARKMASTER:7077 hdfs://enamenode/user/spark
>>
>> thanks
>>
>
>


Re: spark.files.userClassPathFirst=true Return Error - Please help

2015-08-14 Thread Kyle Lin
Hi all

I had similar usage and also got the same problem.

I guess Spark use some class in my user jars but actually it should use the
class in spark-assembly-xxx.jar, but I don't know how to fix it.

Kyle



2015-07-22 23:03 GMT+08:00 Ashish Soni :

> Hi All ,
>
> I am getting below error when i use the --conf
> spark.files.userClassPathFirst=true parameter
>
> Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most
> recent failure: Lost task 3.3 in stage 0.0 (TID 32, 10.200.37.161):
> java.lang.ClassCastException: cannot assign instance of scala.None$ to
> field org.apache.spark.scheduler.Task.metrics of type scala.Option in
> instance of org.apache.spark.scheduler.ResultTask
>
> I am using as below
>
> spark-submit --conf spark.files.userClassPathFirst=true --driver-memory 6g
> --executor-memory 12g --executor-cores 4   --class
> com.ericsson.engine.RateDriver --master local
> /home/spark/workspace/simplerating/target/simplerating-0.0.1-SNAPSHOT.jar
> spark://eSPARKMASTER:7077 hdfs://enamenode/user/spark
>
> thanks
>


Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-14 Thread Akhil Das
Thanks for the clarifications Mrithul.

Thanks
Best Regards

On Fri, Aug 14, 2015 at 1:04 PM, Mridul Muralidharan 
wrote:

> What I understood from Imran's mail (and what was referenced in his
> mail) the RDD mentioned seems to be violating some basic contracts on
> how partitions are used in spark [1].
> They cannot be arbitrarily numbered,have duplicates, etc.
>
>
> Extending RDD to add functionality is typically for niche cases; and
> requires subclasses to adhere to the explicit (and implicit)
> contracts/lifecycles for them.
> Using existing RDD's as template would be a good idea for
> customizations - one way to look at it is, using RDD is more in api
> space but extending them is more in spi space.
>
> Violations would actually not even be detectable by spark-core in general
> case.
>
>
> Regards,
> Mridul
>
> [1] Ignoring the array out of bounds, etc - I am assuming the intent
> is to show overlapping partitions, duplicates. index to partition
> mismatch - that sort of thing.
>
>
> On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das 
> wrote:
> > Yep, and it works fine for operations which does not involve any shuffle
> > (like foreach,, count etc) and those which involves shuffle operations
> ends
> > up in an infinite loop. Spark should somehow indicate this instead of
> going
> > in an infinite loop.
> >
> > Thanks
> > Best Regards
> >
> > On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid 
> wrote:
> >>
> >> oh I see, you are defining your own RDD & Partition types, and you had a
> >> bug where partition.index did not line up with the partitions slot in
> >> rdd.getPartitions.  Is that correct?
> >>
> >> On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das 
> >> wrote:
> >>>
> >>> I figured that out, And these are my findings:
> >>>
> >>> -> It just enters in an infinite loop when there's a duplicate
> partition
> >>> id.
> >>>
> >>> -> It enters in an infinite loop when the partition id starts from 1
> >>> rather than 0
> >>>
> >>>
> >>> Something like this piece of code can reproduce it: (in
> getPartitions())
> >>>
> >>> val total_partitions = 4
> >>> val partitionsArray: Array[Partition] =
> >>> Array.ofDim[Partition](total_partitions)
> >>>
> >>> var i = 0
> >>>
> >>> for(outer <- 0 to 1){
> >>>   for(partition <- 1 to total_partitions){
> >>> partitionsArray(i) = new DeadLockPartitions(partition)
> >>> i = i + 1
> >>>   }
> >>> }
> >>>
> >>> partitionsArray
> >>>
> >>>
> >>>
> >>>
> >>> Thanks
> >>> Best Regards
> >>>
> >>> On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid 
> >>> wrote:
> 
>  yikes.
> 
>  Was this a one-time thing?  Or does it happen consistently?  can you
>  turn on debug logging for o.a.s.scheduler (dunno if it will help, but
> maybe
>  ...)
> 
>  On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das <
> ak...@sigmoidanalytics.com>
>  wrote:
> >
> > Hi
> >
> > My Spark job (running in local[*] with spark 1.4.1) reads data from a
> > thrift server(Created an RDD, it will compute the partitions in
> > getPartitions() call and in computes hasNext will return records
> from these
> > partitions), count(), foreach() is working fine it returns the
> correct
> > number of records. But whenever there is shuffleMap stage (like
> reduceByKey
> > etc.) then all the tasks are executing properly but it enters in an
> infinite
> > loop saying :
> >
> > 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1
> > (map at FilterMain.scala:59) because some of its tasks had failed:
> 0, 3
> >
> >
> > Here's the complete stack-trace http://pastebin.com/hyK7cG8S
> >
> > What could be the root cause of this problem? I looked up and bumped
> > into this closed JIRA (which is very very old)
> >
> >
> >
> >
> > Thanks
> > Best Regards
> 
> 
> >>>
> >>
> >
>


Re: matrix inverse and multiplication

2015-08-14 Thread go canal
Correction: I am not able to convert the Scala statement to java.

Re: Using unserializable classes in tasks

2015-08-14 Thread Dawid Wysakowicz
No the connector does not need to be serializable cause it is constructed
on the worker. Only objects shuffled across partitions needs to be
serializable.

2015-08-14 9:40 GMT+02:00 mark :

> I guess I'm looking for a more general way to use complex graphs of
> objects that cannot be serialized in a task executing on a worker, not just
> DB connectors. Something like shipping jars to the worker maybe?
>
> I'm not sure I understand how your foreach example solves the issue - the
> Connector there would still need to be serializable surely?
>
> Thanks
> On 14 Aug 2015 8:32 am, "Dawid Wysakowicz" 
> wrote:
>
>> I am not an expert but first of all check if there is no ready connector
>> (you mentioned Cassandra - check: spark-cassandra-connector
>>  ).
>>
>> If you really want to do sth on your own all objects constructed in the
>> passed function will be allocated on the worker.
>> Example given:
>>
>> sc.parrallelize((1 to 100)).forEach(x => new Connector().save(x))
>>  but this way you allocate resources frequently
>>
>> 2015-08-14 9:05 GMT+02:00 mark :
>>
>>> I have a Spark job that computes some values and needs to write those
>>> values to a data store. The classes that write to the data store are not
>>> serializable (eg, Cassandra session objects etc).
>>>
>>> I don't want to collect all the results at the driver, I want each
>>> worker to write the data - what is the suggested approach for using code
>>> that can't be serialized in a task?
>>>
>>
>>


Re: Always two tasks slower than others, and then job fails

2015-08-14 Thread Jeff Zhang
Data skew ? May your partition key has some special value like null or
empty string

On Fri, Aug 14, 2015 at 11:01 AM, randylu  wrote:

>   It is strange that there are always two tasks slower than others, and the
> corresponding partitions's data are larger, no matter how many partitions?
>
>
> Executor ID Address Task Time   Shuffle Read Size /
> Records
> 1   slave129.vsvs.com:56691 16 s1   99.5 MB / 18865432
> *10 slave317.vsvs.com:59281 0 ms0   413.5 MB / 311001318*
> 100 slave290.vsvs.com:60241 19 s1   110.8 MB / 27075926
> 101 slave323.vsvs.com:36246 14 s1   126.1 MB / 25052808
>
>   Task time and records of Executor 10 seems strange, and the cpus on the
> node are all 100% busy.
>
>   Anyone meets the same problem,  Thanks in advance for any answer!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Always-two-tasks-slower-than-others-and-then-job-fails-tp24257.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-14 Thread Mridul Muralidharan
What I understood from Imran's mail (and what was referenced in his
mail) the RDD mentioned seems to be violating some basic contracts on
how partitions are used in spark [1].
They cannot be arbitrarily numbered,have duplicates, etc.


Extending RDD to add functionality is typically for niche cases; and
requires subclasses to adhere to the explicit (and implicit)
contracts/lifecycles for them.
Using existing RDD's as template would be a good idea for
customizations - one way to look at it is, using RDD is more in api
space but extending them is more in spi space.

Violations would actually not even be detectable by spark-core in general case.


Regards,
Mridul

[1] Ignoring the array out of bounds, etc - I am assuming the intent
is to show overlapping partitions, duplicates. index to partition
mismatch - that sort of thing.


On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das  wrote:
> Yep, and it works fine for operations which does not involve any shuffle
> (like foreach,, count etc) and those which involves shuffle operations ends
> up in an infinite loop. Spark should somehow indicate this instead of going
> in an infinite loop.
>
> Thanks
> Best Regards
>
> On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid  wrote:
>>
>> oh I see, you are defining your own RDD & Partition types, and you had a
>> bug where partition.index did not line up with the partitions slot in
>> rdd.getPartitions.  Is that correct?
>>
>> On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das 
>> wrote:
>>>
>>> I figured that out, And these are my findings:
>>>
>>> -> It just enters in an infinite loop when there's a duplicate partition
>>> id.
>>>
>>> -> It enters in an infinite loop when the partition id starts from 1
>>> rather than 0
>>>
>>>
>>> Something like this piece of code can reproduce it: (in getPartitions())
>>>
>>> val total_partitions = 4
>>> val partitionsArray: Array[Partition] =
>>> Array.ofDim[Partition](total_partitions)
>>>
>>> var i = 0
>>>
>>> for(outer <- 0 to 1){
>>>   for(partition <- 1 to total_partitions){
>>> partitionsArray(i) = new DeadLockPartitions(partition)
>>> i = i + 1
>>>   }
>>> }
>>>
>>> partitionsArray
>>>
>>>
>>>
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid 
>>> wrote:

 yikes.

 Was this a one-time thing?  Or does it happen consistently?  can you
 turn on debug logging for o.a.s.scheduler (dunno if it will help, but maybe
 ...)

 On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das 
 wrote:
>
> Hi
>
> My Spark job (running in local[*] with spark 1.4.1) reads data from a
> thrift server(Created an RDD, it will compute the partitions in
> getPartitions() call and in computes hasNext will return records from 
> these
> partitions), count(), foreach() is working fine it returns the correct
> number of records. But whenever there is shuffleMap stage (like 
> reduceByKey
> etc.) then all the tasks are executing properly but it enters in an 
> infinite
> loop saying :
>
> 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1
> (map at FilterMain.scala:59) because some of its tasks had failed: 0, 3
>
>
> Here's the complete stack-trace http://pastebin.com/hyK7cG8S
>
> What could be the root cause of this problem? I looked up and bumped
> into this closed JIRA (which is very very old)
>
>
>
>
> Thanks
> Best Regards


>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Using unserializable classes in tasks

2015-08-14 Thread Dawid Wysakowicz
-- Forwarded message --
From: Dawid Wysakowicz 
Date: 2015-08-14 9:32 GMT+02:00
Subject: Re: Using unserializable classes in tasks
To: mark 


I am not an expert but first of all check if there is no ready connector
(you mentioned Cassandra - check: spark-cassandra-connector
 ).

If you really want to do sth on your own all objects constructed in the
passed function will be allocated on the worker.
Example given:

sc.parrallelize((1 to 100)).forEach(x => new Connector().save(x))
 but this way you allocate resources frequently

2015-08-14 9:05 GMT+02:00 mark :

> I have a Spark job that computes some values and needs to write those
> values to a data store. The classes that write to the data store are not
> serializable (eg, Cassandra session objects etc).
>
> I don't want to collect all the results at the driver, I want each worker
> to write the data - what is the suggested approach for using code that
> can't be serialized in a task?
>


Using unserializable classes in tasks

2015-08-14 Thread mark
I have a Spark job that computes some values and needs to write those
values to a data store. The classes that write to the data store are not
serializable (eg, Cassandra session objects etc).

I don't want to collect all the results at the driver, I want each worker
to write the data - what is the suggested approach for using code that
can't be serialized in a task?


Re:Re: Materials for deep insight into Spark SQL

2015-08-14 Thread Todd
Thanks Ted for the help!




At 2015-08-14 12:02:14, "Ted Yu"  wrote:

You can look under Developer Track:
https://spark-summit.org/2015/#day-1


http://www.slideshare.net/jeykottalam/spark-sqlamp-camp2014?related=1 (slightly 
old)



Catalyst design:
https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit



FYI


On Thu, Aug 13, 2015 at 8:54 PM, Todd  wrote:

Hi,
I would ask whether there are slides, blogs or videos on the topic about how 
spark sql is implemented, the process or the whole picture when spark sql 
executes the code, Thanks!.