Strongly Connected Components

2016-11-08 Thread Shreya Agarwal
Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya


Splines or Smoothing Kernels for Linear Regression

2016-11-08 Thread Tobi Bosede
Hi fellow users,

Has anyone ever used splines or smoothing kernels for linear regression in
Spark? If not, does anyone have ideas on how this can be done or what
suitable alternatives exist? I am on Spark 1.6.1 with python.

Thanks,
Tobi


Re: Any Dynamic Compilation of Scala Query

2016-11-08 Thread Mahender Sarangam
Hi Kiran,

Thanks for responding. We would like to know how industry is dealing scenario 
like Update in SPARK.  Here is our scenario Manjunath, We are in process of 
migrating our SQL server data to Spark. We have our logic in stored procedure, 
where we dynamically create SQL String and execute that SQL String (Dynamic 
SQL), we would like to implement Dynamic string and submit to hive context and 
execute it .

Here is the query in SQL

UPDATE table1
  SET   X = A
,Y  = B
 FROM Table1
 WHERE  ISNULL([Z] ,'') <> ''
  AND  [ColumnW] NOT IN ('X' ,'ACD', 'A', 'B', 'C')
 AND   [ColumnA] IS NULL


We would like to convert  using Spark SQL , the other way i would think of is 
using of Data frame with "WithColumn" along with WHEN condition for each column 
i.e X and Y , here when condition will have same repetitive code applied on 
each column based on above where clause stmt/condition . I would like to know 
Industry practices for these kind of scenarios.

On 10/26/2016 4:09 AM, Manjunath, Kiran wrote:
Hi,

Can you elaborate with sample example on why you would want to do so?
Ideally there would be a better approach than solving such problems as 
mentioned below.

A sample example would help to understand the problem.

Regards,
Kiran

From: Mahender Sarangam 

Date: Wednesday, October 26, 2016 at 2:05 PM
To: user 
Subject: Any Dynamic Compilation of Scala Query

Hi,

Is there any way to dynamically execute a string  which has scala code
against spark engine. We are dynamically creating scala file, we would
like to submit this scala file to Spark, but currently spark accepts
only JAR file has input from Remote Job submission. Is there any other
way to submit .SCALA instead of .JAR to REST API of Spark ?

/MS





spark ml - ngram - how to preserve single word (1-gram)

2016-11-08 Thread Nirav Patel
Is it possible to preserve single token while using n-gram feature
transformer?

e.g.

Array("Hi", "I", "heard", "about", "Spark")

Becomes

Array("Hi", "i", "heard", "about", "Spark", "Hi i", "I heard", "heard
about", "about Spark")

Currently if I want to do it I will have to manually transform column first
using current ngram implementation then join 1-gram tokens to each column
value. basically I have to do this outside of pipeline.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Save a spark RDD to disk

2016-11-08 Thread Andrew Holway
Thats around 750MB/s which seems quite respectable even in this day and age!

How many and what kind of disks to you have attached to your nodes? What
are you expecting?

On Tue, Nov 8, 2016 at 11:08 PM, Elf Of Lothlorein 
wrote:

> Hi
> I am trying to save a RDD to disk and I am using the
> saveAsNewAPIHadoopFile for that. I am seeing that it takes almost 20 mins
> for about 900 GB of data. Is there any parameter that I can tune to make
> this saving faster.
> I am running about 45 executors with 5 cores each on 5 Spark worker nodes
> and using Spark on YARN for this..
> Thanks for your help.
> C
>



-- 
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin


Save a spark RDD to disk

2016-11-08 Thread Elf Of Lothlorein
Hi
I am trying to save a RDD to disk and I am using the saveAsNewAPIHadoopFile
for that. I am seeing that it takes almost 20 mins for about 900 GB of
data. Is there any parameter that I can tune to make this saving faster.
I am running about 45 executors with 5 cores each on 5 Spark worker nodes
and using Spark on YARN for this..
Thanks for your help.
C


How Spark determines Parquet partition size

2016-11-08 Thread Selvam Raman
Hi,

Can you please tell me how parquet partitions the data while saving the
dataframe.

I have a dataframe which contains 10 values like below

++

|field_num|

++

| 139|

| 140|

|  40|

|  41|

| 148|

| 149|

| 151|

| 152|

| 153|

| 154|

++


df.write.partitionBy("field_num").parquet("/Users/rs/parti/")

it saves the file like (field_num=140,.filed_num=154)..


when i try the below command it gives 5.

scala> spark.read.parquet("file:///Users/rs/parti").rdd.partitions.length

res4: Int = 5


​so how does parquet partitioning the data in spark?​


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Convert RDD of numpy matrices to Dataframes

2016-11-08 Thread aditya1702
Hello,
I am trying out the MultilayerPerceptronClassifier and it takes only a
dataframe in its train method. Now the problem is that I have a training RDD
of labels (x,y) with x and y being matrices. X has dimensions (1,401) while
y has dimensions (1,10). I need to convert the train RDD to dataframe but on
doing so I get no input in my dataframe.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Convert-RDD-of-numpy-matrices-to-Dataframes-tp28050.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



GraphX and Public Transport Shortest Paths

2016-11-08 Thread Gerard Casey
Hi all,

I’m doing a quick lit review.

Consider I have a graph that has link weights dependent on time. I.e., a bus on 
this road gives a journey time (link weight) of x at time y. This is a classic 
public transport shortest path problem. 

This is a weighted directed graph that is time dependent. Are there any 
resources relating to a shortest path algorithm for such a graph? I suspect 
someone may have done this using GTFS data in some way

Cheers

Geroid
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: GraphX Connected Components

2016-11-08 Thread Robineast
Have you tried this?
https://spark.apache.org/docs/2.0.1/api/scala/index.html#org.apache.spark.graphx.GraphLoader$



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Connected-Components-tp10869p28049.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Anomalous Spark RDD persistence behavior

2016-11-08 Thread Dave Jaffe
No, I am not using serializing either with memory or disk.

Dave Jaffe
VMware
dja...@vmware.com

From: Shreya Agarwal 
Date: Monday, November 7, 2016 at 3:29 PM
To: Dave Jaffe , "user@spark.apache.org" 

Subject: RE: Anomalous Spark RDD persistence behavior

I don’t think this is correct. Unless you are serializing when caching to 
memory but not serializing when persisting to disk. Can you check?

Also, I have seen the behavior where if I have 100 GB in-memory cache and I use 
60 GB to persist something (MEMORY_AND_DISK). Then try to persist another RDD 
with MEMORY_AND_DISK option which is much greater than the remaining 40 GB 
(lets say 1 TB), my executors start getting killed at one point. During this 
period, the memory usage goes above 100GB and after some extra usage it fails. 
It seems like Spark is trying to cache this new RDD to memory and move the old 
one out to disk. But it is not able to move the old one out fast enough and 
crashes with OOM. Anyone seeing that?

From: Dave Jaffe [mailto:dja...@vmware.com]
Sent: Monday, November 7, 2016 2:07 PM
To: user@spark.apache.org
Subject: Anomalous Spark RDD persistence behavior

I’ve been studying Spark RDD persistence with spark-perf 
(https://github.com/databricks/spark-perf),
 especially when the dataset size starts to exceed available memory.

I’m running Spark 1.6.0 on YARN with CDH 5.7. I have 10 NodeManager nodes, each 
with 16 vcores and 32 GB of container memory. So I’m running 39 executors with 
4 cores and 8 GB each (6 GB spark.executor.memory and 2 GB 
spark.yarn.executor.memoryOverhead). I am using the default values for 
spark.memory.fraction and spark.memory.storageFraction so I end up with 3.1 GB 
available for caching RDDs, for a total of about 121 GB.

I’m running a single Random Forest test, with 500 features and up to 40 million 
examples, with 1 partition per core or 156 total partitions. The code (at line 
https://github.com/databricks/spark-perf/blob/master/mllib-tests/v1p5/src/main/scala/mllib/perf/MLAlgorithmTests.scala#L653)
 caches the input RDD immediately after creation. At 30M examples this fits 
into memory with all 156 partitions cached, with a total 113.4 GB in memory, or 
4 blocks of about 745 MB each per executor. So far so good.

At 40M examples, I expected about 3 partitions to fit in memory per executor, 
or 75% to be cached. However, I found only 3 partitions across the cluster were 
cached, or 2%, for a total size in memory of 2.9GB. Three of the executors had 
one block of 992 MB cached, with 2.1 GB free (enough for 2 more blocks). The 
other 36 held no blocks, with 3.1 GB free (enough for 3 blocks). Why this 
dramatic falloff?

Thinking this may improve if I changed the persistence to MEMORY_AND_DISK. 
Unfortunately now the executor memory was exceeded (“Container killed by YARN 
for exceeding memory limits. 8.9 GB of 8 GB physical memory used”) and the run 
ground to a halt. Why does persisting to disk take more memory than caching to 
memory?

Is this behavior expected as dataset size exceeds available memory?

Thanks in advance,

Dave Jaffe
Big Data Performance
VMware
dja...@vmware.com




Running

2016-11-08 Thread rurbanow
Hi,
I have a little haddop, hive, spark, hue setup. I am using hue to try to run
the sample spark notebook. I get the following error messages:

The Spark session could not be created in the cluster: timeout

or

"Session '-1' not found." (error 404)

Spark and livy are up and running.  Searching the web for these error
messages tells me this is a permissions issue relating to the application
history folder and the spark-warehouse directory. On my system, both
directories have 777 permissions.


I also try this:

curl -X POST --data '{"kind": "pyspark"}' -H "Content-Type:
application/json" localhost:8998/sessions

Everything starts, and then I get the following in Hue:

16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
hive.HiveSharedState: Warehouse path is '/usr/local/spark/spark-warehouse'.
16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
server.ServerConnector: Stopped
ServerConnector@6d5031fe{HTTP/1.1}{0.0.0.0:4040}
16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@574c1328{/stages/stage/kill,null,UNAVAILABLE}
16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@683e60e7{/api,null,UNAVAILABLE}
16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@32780f73{/,null,UNAVAILABLE}
16/10/21 11:17:04 INFO ContextLauncher: 16/10/21 11:17:04 INFO
handler.ContextHandler: Stopped
o.s.j.s.ServletContextHandler@37faa18{/static,null,UNAVAILABLE}

This goes on with stopping everything and shutting down other processes.

spark-warehouse folder exists with 777 permissions.
Does anyone have any insight as to why this is happening? Thank you.
-rob



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-tp28046.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Scratch that, it's working fine.

Thank you.

On Tue, Nov 8, 2016 at 8:19 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I should have used transform instead of map
>
> val x: DStream[(String, Record)] = 
> kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)
>
> but I'm still unable to call mapWithState on x.
>
> any idea why ?
>
> Thank you,
> Daniel
>
>
>
> On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv  com> wrote:
>
>> Hi,
>> I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :
>>
>> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
>> val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> {  
>> sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})
>>
>>
>> Because stateStream is a DStream[RDD[(String, Record)]] I can't call 
>> mapWithState on it.
>> How can I map it to a DStream[(String,Record)] ?
>>
>> Thank you,
>> Daniel
>>
>>
>


Re: mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Hi,
I should have used transform instead of map

val x: DStream[(String, Record)] =
kafkaStream.transform(x=>{x.map(_._2)}).transform(x=>{sqlContext.read.json(x).as[Record].map(r=>(r.iid,r))}.rdd)

but I'm still unable to call mapWithState on x.

any idea why ?

Thank you,
Daniel



On Tue, Nov 8, 2016 at 7:46 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :
>
> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
> val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=> {  
> sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})
>
>
> Because stateStream is a DStream[RDD[(String, Record)]] I can't call 
> mapWithState on it.
> How can I map it to a DStream[(String,Record)] ?
>
> Thank you,
> Daniel
>
>


mapWithState with Datasets

2016-11-08 Thread Daniel Haviv
Hi,
I'm trying to make a stateful stream of Tuple2[String, Dataset[Record]] :

val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val stateStream: DStream[RDD[(String, Record)]] = kafkaStream.map(x=>
{  sqlContext.read.json(x._2).as[Record]}).map(x=>{x.map(r=>(r.iid,r)).rdd})


Because stateStream is a DStream[RDD[(String, Record)]] I can't call
mapWithState on it.
How can I map it to a DStream[(String,Record)] ?

Thank you,
Daniel


read large number of files on s3

2016-11-08 Thread Xiaomeng Wan
Hi,
We have 30 million small (100k each) files on s3 to process. I am thinking
about something like below to load them in parallel

val data = sc.union(sc.wholeTextFiles("s3a://.../*.json").map(...)
.toDF().createOrReplaceTempView("data")

How to estimate the driver memory it should be given? is there better
practice? or should I merge them in preprocess? Thanks in advance.

Regards,
Shawn


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-08 Thread Masood Krohy
No, you do not scale back the predicted value. The output values (labels) 
were never scaled; only input features were scaled.

For prediction on new samples, you scale the new sample first using the 
avg/std that you calculated for each feature when you trained your model, 
then feed it to the trained model. If it's a classification problem, then 
you're done here, a class is predicted based on the trained model. If it's 
a regression problem, then the predicted value does not need scaling back; 
it is in the same scale as your original output values you used when you 
trained your model.

This is now becoming more of a Data Science/ML problem and not a Spark 
issue and is probably best kept off this list. Do some reading on the 
topic and get back to me direct; I'll respond when possible.

Hope this has helped.

Masood

--
Masood Krohy, Ph.D. 
Data Scientist, Intact Lab-R 
Intact Financial Corporation 
http://ca.linkedin.com/in/masoodkh 



De :Carlo.Allocca 
A : Masood Krohy 
Cc :Carlo.Allocca , Mohit Jaggi 
, "user@spark.apache.org" 
Date :  2016-11-08 11:02
Objet : Re: LinearRegressionWithSGD and Rank Features By Importance



Hi Masood, 

Thank you again for your suggestion. 
I have got a question about the following: 

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model. 


When applying the ML linear model as suggested above, it means that the 
predicted value is scaled. My question: Does it need be scaled-back? I 
mean to apply  the inverse of "calculate the average and std for each 
feature, deduct the avg, then divide by std.” to the predicted-value?
In practice, (predicted-value * std) + avg? 

Is that correct? Am I missing anything?

Many Thanks in advance. 
Best Regards,
Carlo


On 7 Nov 2016, at 17:14, carlo allocca  wrote:

I found it just google 
http://sebastianraschka.com/Articles/2014_about_feature_scaling.html 

Thanks.
Carlo
On 7 Nov 2016, at 17:12, carlo allocca  wrote:

Hi Masood, 

Thank you very much for your insight. 
I am going to scale all my features as you described. 

As I am beginners, Is there any paper/book that would explain the 
suggested approaches? I would love to read. 

Many Thanks,
Best Regards,
Carlo

 



On 7 Nov 2016, at 16:27, Masood Krohy  wrote:

Yes, you would want to scale those features before feeding into any 
algorithm, one typical way would be to calculate the average and std for 
each feature, deduct the avg, then divide by std. Dividing by "max - min" 
is also a good option if you're sure there is no outlier shooting up your 
max or lowering your min significantly for each feature. After you have 
scaled each feature, then you can feed the data into the algo for 
training. 

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model. 

It's not too complicated to implement manually, but Spark API has some 
support for this already: 
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler 
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler
 

Masood 


--
Masood Krohy, Ph.D. 
Data Scientist, Intact Lab-R 
Intact Financial Corporation 
http://ca.linkedin.com/in/masoodkh 



De :Carlo.Allocca  
A :Masood Krohy  
Cc :Carlo.Allocca , Mohit Jaggi <
mohitja...@gmail.com>, "user@spark.apache.org"  
Date :2016-11-07 10:50 
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance 




Hi Masood, 

thank you very much for the reply. It is very a good point as I am getting 
very bed result so far. 

If I understood well what you suggest is to scale the date below (it is 
part of my dataset) before applying linear regression SGD. 

is it correct? 

Many Thanks in advance. 

Best Regards, 
Carlo 

 

On 7 Nov 2016, at 15:31, Masood Krohy  wrote: 

If you go down this route (look at actual coefficients/weights), then make 
sure your features are scaled first and have more or less the same mean 
when feeding them into the algo. If not, then actual coefficients/weights 
wouldn't tell you much. In any case, SGD performs badly with unscaled 
features, so you gain if you scale the features beforehand. 
Masood 

--
Masood Krohy, Ph.D. 
Data Scientist, Intact Lab-R 
Intact Financial Corporation 
http://ca.linkedin.com/in/masoodkh 



De :Carlo.Allocca  
A :Mohit Jaggi  
Cc :Carlo.Allocca , "
user@spark.apache.org"  
Date :  

use case reading files split per id

2016-11-08 Thread ruben
Hey,

We have files organized on hdfs in this manner:

base_folder
|- 
|- file1
|- file2
|- ...
|- 
|- file1
|- file2
|- ...
| - ...

We want to be able to do the following operation on our data:

- for each ID we want to parse the lines into records (timestamp, record
data), giving us a list[(timestamp, record_data)] for each ID
- then we want to do an operation on list[(timestamp, record_data)], giving
us a list[output], note that this operation is not a simple map operation
(timestamp, record_data) -> output, but it requires to know the full list of
records for an id

Currently we are doing this in the following way:

val ids: List[String] = 
val idsWithPaths: List[(String, List[String])] = 
sc.parallelize(idsWithPaths, partitions)
.map{ case (id, pathList) =>
  val sourceList: List[Source] = 
  val combinedIterator: Iterator = sourceList.map(_.getLines()).reduceLeft(_
++ _)
  val records:List[(Timestamp, RecordData)] = parseRecords(combinedIterator,
id)
  val output: List[Output] = generateOutput(records, id)
}

I would like to know if this is a good way to do this operation. It seems to
me that it doesn't make full use of the capabilities of spark (data locality
for example, since there is no way for the partitioner to know how to
distribute the ids close to the files on hdfs). Some attempts where made to
translate this using sc.textfile and sc.wholetextfiles but by doing some
small benchmarks it seemed that those were slower (but it could be due to
the specific implementation, since it required some groupByKey/reduceByKey
steps to gather the data for each ID into a list[(timestamp, record_data)]
to be able to do the generateOutput function).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/use-case-reading-files-split-per-id-tp28044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Live data visualisations with Spark

2016-11-08 Thread Masood Krohy
+1 for Zeppelin.

See 
https://community.hortonworks.com/articles/10365/apache-zeppelin-and-sparkr.html


--
Masood Krohy, Ph.D. 
Data Scientist, Intact Lab-R 
Intact Financial Corporation 
http://ca.linkedin.com/in/masoodkh 



De :Vadim Semenov 
A : Andrew Holway 
Cc :user 
Date :  2016-11-08 11:17
Objet : Re: Live data visualisations with Spark



Take a look at https://zeppelin.apache.org

On Tue, Nov 8, 2016 at 11:13 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:
Hello,

A colleague and I are trying to work out the best way to provide live data 
visualisations based on Spark. Is it possible to explore a dataset in 
spark from a web browser? Set up pre defined functions that the user can 
click on which return datsets.

We are using a lot of R here. Is this something that could be accomplished 
with shiny server for instance?

Thanks,

Andrew Holway




Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-08 Thread Arijit
Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent 
configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support 
append what are our options? I am not familiar with the code yet but is it 
possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit


From: Tathagata Das 
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must 
support file appends. Contact your HDFS package/installation provider to figure 
out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit 
> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of 
creating a new file
val stream: FSDataOutputStream = {
  if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
  dfs.append(dfsPath)
} else {
  throw new IllegalStateException("File exists and there is no append 
support!")
}
  } else {
dfs.create(dfsPath)
  }
}
stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 

Re: sanboxing spark executors

2016-11-08 Thread Michael Segel
Not that easy of a problem to solve… 

Can you impersonate the user who provided the code? 

I mean if Joe provides the lambda function, then it runs as Joe so it has joe’s 
permissions. 

Steve is right, you’d have to get down to your cluster’s security and 
authenticate the user before accepting the lambda code. You may also want to 
run with a restricted subset of permissions. 
(e.g. Joe is an admin, but he wants it to run as if its an untrusted user… this 
gets a bit more interesting.) 

And this beg’s the question… 

How are you sharing your RDDs across multiple users?  This too opens up a 
security question or two… 



> On Nov 4, 2016, at 6:13 PM, blazespinnaker  wrote:
> 
> In particular, we need to make sure the RDDs execute the lambda functions
> securely as they are provided by user code.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/sanboxing-spark-executors-tp28014p28024.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Live data visualisations with Spark

2016-11-08 Thread Vadim Semenov
Take a look at https://zeppelin.apache.org

On Tue, Nov 8, 2016 at 11:13 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> Hello,
>
> A colleague and I are trying to work out the best way to provide live data
> visualisations based on Spark. Is it possible to explore a dataset in spark
> from a web browser? Set up pre defined functions that the user can click on
> which return datsets.
>
> We are using a lot of R here. Is this something that could be accomplished
> with shiny server for instance?
>
> Thanks,
>
> Andrew Holway
>


Live data visualisations with Spark

2016-11-08 Thread Andrew Holway
Hello,

A colleague and I are trying to work out the best way to provide live data
visualisations based on Spark. Is it possible to explore a dataset in spark
from a web browser? Set up pre defined functions that the user can click on
which return datsets.

We are using a lot of R here. Is this something that could be accomplished
with shiny server for instance?

Thanks,

Andrew Holway


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-08 Thread Carlo . Allocca
Hi Masood,

Thank you again for your suggestion.
I have got a question about the following:

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.


When applying the ML linear model as suggested above, it means that the 
predicted value is scaled. My question: Does it need be scaled-back? I mean to 
apply  the inverse of "calculate the average and std for each feature, deduct 
the avg, then divide by std.” to the predicted-value?
In practice, (predicted-value * std) + avg?

Is that correct? Am I missing anything?

Many Thanks in advance.
Best Regards,
Carlo


On 7 Nov 2016, at 17:14, carlo allocca 
> wrote:

I found it just google 
http://sebastianraschka.com/Articles/2014_about_feature_scaling.html

Thanks.
Carlo
On 7 Nov 2016, at 17:12, carlo allocca 
> wrote:

Hi Masood,

Thank you very much for your insight.
I am going to scale all my features as you described.

As I am beginners, Is there any paper/book that would explain the suggested 
approaches? I would love to read.

Many Thanks,
Best Regards,
Carlo





On 7 Nov 2016, at 16:27, Masood Krohy 
> wrote:

Yes, you would want to scale those features before feeding into any algorithm, 
one typical way would be to calculate the average and std for each feature, 
deduct the avg, then divide by std. Dividing by "max - min" is also a good 
option if you're sure there is no outlier shooting up your max or lowering your 
min significantly for each feature. After you have scaled each feature, then 
you can feed the data into the algo for training.

For prediction on new samples, you need to scale each sample first before 
making predictions using your trained model.

It's not too complicated to implement manually, but Spark API has some support 
for this already:
ML: http://spark.apache.org/docs/latest/ml-features.html#standardscaler
MLlib: 
http://spark.apache.org/docs/latest/mllib-feature-extraction.html#standardscaler

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Masood Krohy 
>
Cc :Carlo.Allocca 
>, Mohit Jaggi 
>, 
"user@spark.apache.org" 
>
Date :2016-11-07 10:50
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Masood,

thank you very much for the reply. It is very a good point as I am getting very 
bed result so far.

If I understood well what you suggest is to scale the date below (it is part of 
my dataset) before applying linear regression SGD.

is it correct?

Many Thanks in advance.

Best Regards,
Carlo



On 7 Nov 2016, at 15:31, Masood Krohy 
> wrote:

If you go down this route (look at actual coefficients/weights), then make sure 
your features are scaled first and have more or less the same mean when feeding 
them into the algo. If not, then actual coefficients/weights wouldn't tell you 
much. In any case, SGD performs badly with unscaled features, so you gain if 
you scale the features beforehand.

Masood

--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R
Intact Financial Corporation
http://ca.linkedin.com/in/masoodkh



De :Carlo.Allocca 
>
A :Mohit Jaggi >
Cc :Carlo.Allocca 
>, 
"user@spark.apache.org" 
>
Date :2016-11-04 03:39
Objet :Re: LinearRegressionWithSGD and Rank Features By Importance





Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi 
> > wrote:
>
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
>> > wrote:
>>
>> Hi All,
>>
>> I am using SPARK and in particular the MLib library.
>>
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import 

Re: TallSkinnyQR

2016-11-08 Thread Iman Mohtashemi
Thanks Sean! Let me take a look!
Iman

On Nov 8, 2016 7:29 AM, "Sean Owen"  wrote:

> I think the problem here is that IndexedRowMatrix.toRowMatrix does *not*
> result in a RowMatrix with rows in order of their indices, necessarily:
>
> // Drop its row indices.
> RowMatrix rowMat = indexedRowMatrix.toRowMatrix();
>
> What you get is a matrix where the rows are arranged in whatever order
> they were passed to IndexedRowMatrix. RowMatrix says it's for rows where
> the ordering doesn't matter, but then it's maybe surprising it has a QR
> decomposition method, because clearly the result depends on the order of
> rows in the input. (CC Yuhao Yang for a comment?)
>
> You could say, well, why doesn't IndexedRowMatrix.toRowMatrix return at
> least something with sorted rows? that would not be hard. It also won't
> return "missing" rows (all zeroes), so it would not in any event result in
> a RowMatrix whose implicit rows and ordering represented the same matrix.
> That, at least, strikes me as something to be better documented.
>
> Maybe it would be nicer still to at least sort the rows, given the
> existence of use cases like yours. For example, at least 
> CoordinateMatrix.toIndexedRowMatrix
> could sort? that is less surprising.
>
> In any event you should be able to make it work by manually getting the
> RDD[IndexedRow] out of IndexedRowMatrix, sorting by index, then mapping it
> to Vectors and making a RowMatrix from it.
>
>
>
> On Tue, Nov 8, 2016 at 2:41 PM Iman Mohtashemi 
> wrote:
>
>> Hi Sean,
>> Here you go:
>>
>> sparsematrix.txt =
>>
>> row, col ,val
>> 0,0,.42
>> 0,1,.28
>> 0,2,.89
>> 1,0,.83
>> 1,1,.34
>> 1,2,.42
>> 2,0,.23
>> 3,0,.42
>> 3,1,.98
>> 3,2,.88
>> 4,0,.23
>> 4,1,.36
>> 4,2,.97
>>
>> The vector is just the third column of the matrix which should give the
>> trivial solution of [0,0,1]
>>
>> This translates to this which is correct
>> There are zeros in the matrix (Not really sparse but just an example)
>> 0.42  0.28  0.89
>> 0.83  0.34  0.42
>> 0.23  0.0   0.0
>> 0.42  0.98  0.88
>> 0.23  0.36  0.97
>>
>>
>> Here is what I get for  the Q and R
>>
>> Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
>> -0.3920784235278427   -0.06171221388256143  0.5847874866876442
>> -0.7748216464954987   -0.4003560542230838   -0.29392323671555354
>> -0.3920784235278427   0.8517909521421976-0.31435038559403217
>> -0.21470961288429483  -0.23389547730301666  -0.11165321782745863
>> R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
>> 0.0  0.7662808691141717   0.7553315911660984
>> 0.0  0.0  0.7785210939368136
>>
>> When running this in matlab the numbers are the same but row 1 is the
>> last row and the last row is interchanged with row 3
>>
>>
>>
>> On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:
>>
>> Rather than post a large section of code, please post a small example of
>> the input matrix and its decomposition, to illustrate what you're saying is
>> out of order.
>>
>> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>>
>> I am getting the correct rows but they are out of order. Is this a bug or
>> am
>> I doing something wrong?
>>
>>
>>


Re: TallSkinnyQR

2016-11-08 Thread Sean Owen
I think the problem here is that IndexedRowMatrix.toRowMatrix does *not*
result in a RowMatrix with rows in order of their indices, necessarily:

// Drop its row indices.
RowMatrix rowMat = indexedRowMatrix.toRowMatrix();

What you get is a matrix where the rows are arranged in whatever order they
were passed to IndexedRowMatrix. RowMatrix says it's for rows where the
ordering doesn't matter, but then it's maybe surprising it has a QR
decomposition method, because clearly the result depends on the order of
rows in the input. (CC Yuhao Yang for a comment?)

You could say, well, why doesn't IndexedRowMatrix.toRowMatrix return at
least something with sorted rows? that would not be hard. It also won't
return "missing" rows (all zeroes), so it would not in any event result in
a RowMatrix whose implicit rows and ordering represented the same matrix.
That, at least, strikes me as something to be better documented.

Maybe it would be nicer still to at least sort the rows, given the
existence of use cases like yours. For example, at least
CoordinateMatrix.toIndexedRowMatrix could sort? that is less surprising.

In any event you should be able to make it work by manually getting the
RDD[IndexedRow] out of IndexedRowMatrix, sorting by index, then mapping it
to Vectors and making a RowMatrix from it.



On Tue, Nov 8, 2016 at 2:41 PM Iman Mohtashemi 
wrote:

> Hi Sean,
> Here you go:
>
> sparsematrix.txt =
>
> row, col ,val
> 0,0,.42
> 0,1,.28
> 0,2,.89
> 1,0,.83
> 1,1,.34
> 1,2,.42
> 2,0,.23
> 3,0,.42
> 3,1,.98
> 3,2,.88
> 4,0,.23
> 4,1,.36
> 4,2,.97
>
> The vector is just the third column of the matrix which should give the
> trivial solution of [0,0,1]
>
> This translates to this which is correct
> There are zeros in the matrix (Not really sparse but just an example)
> 0.42  0.28  0.89
> 0.83  0.34  0.42
> 0.23  0.0   0.0
> 0.42  0.98  0.88
> 0.23  0.36  0.97
>
>
> Here is what I get for  the Q and R
>
> Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
> -0.3920784235278427   -0.06171221388256143  0.5847874866876442
> -0.7748216464954987   -0.4003560542230838   -0.29392323671555354
> -0.3920784235278427   0.8517909521421976-0.31435038559403217
> -0.21470961288429483  -0.23389547730301666  -0.11165321782745863
> R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
> 0.0  0.7662808691141717   0.7553315911660984
> 0.0  0.0  0.7785210939368136
>
> When running this in matlab the numbers are the same but row 1 is the last
> row and the last row is interchanged with row 3
>
>
>
> On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:
>
> Rather than post a large section of code, please post a small example of
> the input matrix and its decomposition, to illustrate what you're saying is
> out of order.
>
> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>
> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>


Re: TallSkinnyQR

2016-11-08 Thread Iman Mohtashemi
So
 b =
0.89
0.42
0.0
0.88
0.97
The solution at the bottom is the solution to Ax = b solved using Gaussian
elimination. I guess another question is, is there another way to solve
this problem? I'm trying to solve the least squares fit with a huge A (5MM
x 1MM)

x = inverse(A-transpose*A)*A-transose*b

but I didn't see any functions for matrix inversion

I suppose I can use an iterative solver but I didn't see that either which
is why I chose the QR decomposition , solve for Q and then Q-transpose*b =
d and the solve Lx = d which would give the solution. But I don't think
this would work either since the matrices are local copies and not RDD data
structures. Any advice would be appreciated...
Iman

P.S. I also looked in the linear regression class in the mlib but I haven't
seen any examples with sparse matrix and sparse vectors as the input just
'Dataset' If you have a code example of this this would work??


On Tue, Nov 8, 2016 at 6:41 AM Iman Mohtashemi 
wrote:

> Hi Sean,
> Here you go:
>
> sparsematrix.txt =
>
> row, col ,val
> 0,0,.42
> 0,1,.28
> 0,2,.89
> 1,0,.83
> 1,1,.34
> 1,2,.42
> 2,0,.23
> 3,0,.42
> 3,1,.98
> 3,2,.88
> 4,0,.23
> 4,1,.36
> 4,2,.97
>
> The vector is just the third column of the matrix which should give the
> trivial solution of [0,0,1]
>
> This translates to this which is correct
> There are zeros in the matrix (Not really sparse but just an example)
> 0.42  0.28  0.89
> 0.83  0.34  0.42
> 0.23  0.0   0.0
> 0.42  0.98  0.88
> 0.23  0.36  0.97
>
>
> Here is what I get for  the Q and R
>
> Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
> -0.3920784235278427   -0.06171221388256143  0.5847874866876442
> -0.7748216464954987   -0.4003560542230838   -0.29392323671555354
> -0.3920784235278427   0.8517909521421976-0.31435038559403217
> -0.21470961288429483  -0.23389547730301666  -0.11165321782745863
> R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
> 0.0  0.7662808691141717   0.7553315911660984
> 0.0  0.0  0.7785210939368136
>
> When running this in matlab the numbers are the same but row 1 is the last
> row and the last row is interchanged with row 3
>
>
>
> On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:
>
> Rather than post a large section of code, please post a small example of
> the input matrix and its decomposition, to illustrate what you're saying is
> out of order.
>
> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>
> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>


Re: Kafka stream offset management question

2016-11-08 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

specifically

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

Have you set enable.auto.commit to false?

The new consumer stores offsets in kafka, so the idea of specifically
deleting offsets for that group doesn't really make sense.

In other words

- set enable.auto.commit to false
- use a new group.id


On Tue, Nov 8, 2016 at 2:21 AM, Haopu Wang  wrote:
> I'm using Kafka direct stream (auto.offset.reset = earliest) and enable
> Spark streaming's checkpoint.
>
>
>
> The application starts and consumes messages correctly. Then I stop the
> application and clean the checkpoint folder.
>
>
>
> I restart the application and expect it to consumes old messages. But it
> doesn't consume any data. And there are logs as below:
>
>
>
>  [org.apache.spark.streaming.kafka010.KafkaRDD] (Executor task
> launch worker-0;) Beginning offset 25 is the same as ending offset skipping
> aa 0
>
>
>
> So I think the offset is stored not only in checkpoint but also in Kafka,
> right?
>
> Is it because I'm using the same group.id? How can I delete the consumer
> group manually?
>
>
>
> Thanks again for any help!
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: TallSkinnyQR

2016-11-08 Thread Iman Mohtashemi
Hi Sean,
Here you go:

sparsematrix.txt =

row, col ,val
0,0,.42
0,1,.28
0,2,.89
1,0,.83
1,1,.34
1,2,.42
2,0,.23
3,0,.42
3,1,.98
3,2,.88
4,0,.23
4,1,.36
4,2,.97

The vector is just the third column of the matrix which should give the
trivial solution of [0,0,1]

This translates to this which is correct
There are zeros in the matrix (Not really sparse but just an example)
0.42  0.28  0.89
0.83  0.34  0.42
0.23  0.0   0.0
0.42  0.98  0.88
0.23  0.36  0.97


Here is what I get for  the Q and R

Q: -0.21470961288429483  0.23590615093828807   0.6784910613691661
-0.3920784235278427   -0.06171221388256143  0.5847874866876442
-0.7748216464954987   -0.4003560542230838   -0.29392323671555354
-0.3920784235278427   0.8517909521421976-0.31435038559403217
-0.21470961288429483  -0.23389547730301666  -0.11165321782745863
R: -1.0712142642814275  -0.8347536340918976  -1.227672225670157
0.0  0.7662808691141717   0.7553315911660984
0.0  0.0  0.7785210939368136

When running this in matlab the numbers are the same but row 1 is the last
row and the last row is interchanged with row 3



On Mon, Nov 7, 2016 at 11:35 PM Sean Owen  wrote:

> Rather than post a large section of code, please post a small example of
> the input matrix and its decomposition, to illustrate what you're saying is
> out of order.
>
> On Tue, Nov 8, 2016 at 3:50 AM im281  wrote:
>
> I am getting the correct rows but they are out of order. Is this a bug or
> am
> I doing something wrong?
>
>
>


Spark streaming uses lesser number of executors

2016-11-08 Thread Aravindh
Hi, I am using spark streaming process some events. It is deployed in
standalone mode with 1 master and 3 workers. I have set number of cores per
executor to 4 and total num of executors to 24. This means totally 6
executors will be spawned. I have set spread-out to true. So each worker
machine get 2 executors. My batch interval is 1 second. While running what I
observe from event timeline is that only 3 of the executors are being used.
The other 3 are not being used. As far as I know, there is no parameter in
spark standalone mode to specify the number of executors. How do I make
spark to use all the available executors? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-uses-lesser-number-of-executors-tp28042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataSet toJson

2016-11-08 Thread Andrés Ivaldi
Ok, digging the code, I find out in the class JacksonGenerator the next
method

private def writeFields(
  row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]):
Unit = {
  var i = 0
  while (i < row.numFields) {
val field = schema(i)
if (!row.isNullAt(i)) {
  gen.writeFieldName(field.name)
  fieldWriters(i).apply(row, i)
}
i += 1
  }
}

So null values are directly ignored, I've to rewrite the method toJson to
use my own JacksonGenerator.

Regards.



On Tue, Nov 8, 2016 at 10:06 AM, Andrés Ivaldi  wrote:

> Hello, I'm using spark 2.0 and I'm using toJson method. I've seen that
> Null values are omitted in the Json Record, witch is valid, but I need the
> field with null as value, it's possible to configure that?
>
> thanks.
>
>


-- 
Ing. Ivaldi Andres


how to write a substring search efficiently?

2016-11-08 Thread Haig Didizian
Hello!

I have two datasets -- one of short strings, one of longer strings. Some of
the longer strings contain the short strings, and I need to identify which.

What I've written is taking forever to run (pushing 11 hours on my quad
core i5 with 12 GB RAM), appearing to be CPU bound. The way I've written it
required me to enable cross joins (spark.sql.crossJoin.enabled = true),
which makes me wonder if there's a better approach.

What I've done looks something like this:


shortStrings
  .as("short")
  .joinWith(longStrings.as("long"), $"long.sequence".contains($"
short.sequence"))
  .map( joined => {
StringMatch( joined._1.id, joined._2.id, joined._1.sequence )
  })


The dataset of short strings has about 150k rows, and the dataset of long
strings has about 300k. I assume it needs to do a cartesian product because
of the substring search? Is there a more efficient way to do this?

I'd appreciate any pointers or suggestions. Thanks!
Haig


DataSet toJson

2016-11-08 Thread Andrés Ivaldi
Hello, I'm using spark 2.0 and I'm using toJson method. I've seen that Null
values are omitted in the Json Record, witch is valid, but I need the field
with null as value, it's possible to configure that?

thanks.


Kafka stream offset management question

2016-11-08 Thread Haopu Wang
I'm using Kafka direct stream (auto.offset.reset = earliest) and enable
Spark streaming's checkpoint.

 

The application starts and consumes messages correctly. Then I stop the
application and clean the checkpoint folder.

 

I restart the application and expect it to consumes old messages. But it
doesn't consume any data. And there are logs as below:

 

 [org.apache.spark.streaming.kafka010.KafkaRDD] (Executor task
launch worker-0;) Beginning offset 25 is the same as ending offset
skipping aa 0

 

So I think the offset is stored not only in checkpoint but also in
Kafka, right?

Is it because I'm using the same group.id? How can I delete the consumer
group manually?

 

Thanks again for any help!

 



RE: InvalidClassException when load KafkaDirectStream from checkpoint (Spark 2.0.0)

2016-11-08 Thread Haopu Wang
It turns out to be a bug in application code. Thank you!

 



From: Haopu Wang 
Sent: 2016年11月4日 17:23
To: user@spark.apache.org; Cody Koeninger
Subject: InvalidClassException when load KafkaDirectStream from checkpoint 
(Spark 2.0.0)

 

When I load spark checkpoint, I get below error. Do you have any idea? 

Much thanks!

 

*

 

2016-11-04 17:12:19,582 INFO  [org.apache.spark.streaming.CheckpointReader] 
(main;) Checkpoint files found: 
file:/d:/temp/checkpoint/checkpoint-147825070,file:/d:/temp/checkpoint/checkpoint-147825070.bk,file:/d:/temp/checkpoint/checkpoint-147825069,file:/d:/temp/checkpoint/checkpoint-147825069.bk,file:/d:/temp/checkpoint/checkpoint-147825068,file:/d:/temp/checkpoint/checkpoint-147825068.bk,file:/d:/temp/checkpoint/checkpoint-147825067,file:/d:/temp/checkpoint/checkpoint-147825067.bk

2016-11-04 17:12:19,584 INFO  [org.apache.spark.streaming.CheckpointReader] 
(main;) Attempting to load checkpoint from file 
file:/d:/temp/checkpoint/checkpoint-147825070

2016-11-04 17:12:19,640 DEBUG [org.apache.spark.streaming.DStreamGraph] (main;) 
DStreamGraph.readObject used

2016-11-04 17:12:19,661 DEBUG 
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream] (main;) 
DirectKafkaInputDStream.readObject used

2016-11-04 17:12:19,664 DEBUG 
[org.apache.spark.streaming.dstream.DStreamCheckpointData] (main;) 
DStreamCheckpointData.readObject used

2016-11-04 17:12:19,679 DEBUG 
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData]
 (main;) DirectKafkaInputDStreamCheckpointData.readObject used

2016-11-04 17:12:19,685 ERROR [org.apache.spark.util.Utils] (main;) Exception 
encountered

java.io.InvalidClassException: 
scala.collection.convert.Wrappers$MutableMapWrapper; no valid constructor

 at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)

 at 
java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1772)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:193)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStreamGraph.scala:189)

 at 
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStreamGraph.scala:189)

 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)

 at 
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:189)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at